This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 8f710f76 Add spark basic type converter (#1837)
8f710f76 is described below

commit 8f710f76a9a1ddd2793adecaf5611aec27d86666
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue May 10 08:28:42 2022 +0800

    Add spark basic type converter (#1837)
---
 .../apache/seatunnel/api/table/catalog/Column.java |  28 ++---
 .../table/connector/SupportReadingMetadata.java    |   6 +-
 .../apache/seatunnel/api/table/type/ArrayType.java |   2 +-
 .../apache/seatunnel/api/table/type/BasicType.java |   2 +-
 .../apache/seatunnel/api/table/type/EnumType.java  |   2 +-
 .../apache/seatunnel/api/table/type/ListType.java  |   8 +-
 .../apache/seatunnel/api/table/type/MapType.java   |  12 +-
 .../apache/seatunnel/api/table/type/PojoType.java  |   8 +-
 .../type/{DataType.java => SeaTunnelDataType.java} |   2 +-
 .../seatunnel/api/table/type/TimestampType.java    |   2 +-
 .../flink/types/BasicTypeConverter.java            |   2 +-
 .../flink/types/FlinkTypeConverter.java            |   8 +-
 .../flink/utils/TypeConverterUtils.java            |  16 ++-
 .../flink/types/BasicTypeConverterTest.java        |   2 +-
 .../flink/types/PojoTypeConverterTest.java         |   4 +-
 .../seatunnel-translation-spark/pom.xml            |   6 +
 .../serialization/SparkRowSerialization.java}      |  34 +++---
 .../spark/types/ArrayTypeConverter.java}           |  27 ++---
 .../spark/types/BasicTypeConverter.java            | 112 ++++++++++++++++++
 .../translation/spark/types/PojoTypeConverter.java |  16 ++-
 .../spark/types/SparkDataTypeConverter.java        |  18 +++
 .../spark/types/TimestampTypeConverter.java}       |  32 +++---
 .../spark/utils/TypeConverterUtils.java            | 127 +++++++++++++++++++++
 23 files changed, 369 insertions(+), 107 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 39304916..c1f5ed78 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -18,7 +18,7 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import java.util.Objects;
 import java.util.Optional;
@@ -34,11 +34,11 @@ public abstract class Column {
     /**
      * Data type of the column.
      */
-    protected final DataType<?> dataType;
+    protected final SeaTunnelDataType<?> dataType;
 
     protected final String comment;
 
-    private Column(String name, DataType<?> dataType, String comment) {
+    private Column(String name, SeaTunnelDataType<?> dataType, String comment) 
{
         this.name = name;
         this.dataType = dataType;
         this.comment = comment;
@@ -47,7 +47,7 @@ public abstract class Column {
     /**
      * Creates a regular table column that represents physical data.
      */
-    public static PhysicalColumn physical(String name, DataType<?> dataType) {
+    public static PhysicalColumn physical(String name, SeaTunnelDataType<?> 
dataType) {
         return new PhysicalColumn(name, dataType);
     }
 
@@ -58,7 +58,7 @@ public abstract class Column {
      * <p>Allows to specify whether the column is virtual or not.
      */
     public static MetadataColumn metadata(
-            String name, DataType<?> dataType, String metadataKey) {
+        String name, SeaTunnelDataType<?> dataType, String metadataKey) {
         return new MetadataColumn(name, dataType, metadataKey);
     }
 
@@ -76,7 +76,7 @@ public abstract class Column {
     /**
      * Returns the data type of this column.
      */
-    public DataType<?> getDataType() {
+    public SeaTunnelDataType<?> getDataType() {
         return this.dataType;
     }
 
@@ -95,9 +95,9 @@ public abstract class Column {
     }
 
     /**
-     * Returns a copy of the column with a replaced {@link DataType}.
+     * Returns a copy of the column with a replaced {@link SeaTunnelDataType}.
      */
-    public abstract Column copy(DataType<?> newType);
+    public abstract Column copy(SeaTunnelDataType<?> newType);
 
     @Override
     public boolean equals(Object o) {
@@ -127,11 +127,11 @@ public abstract class Column {
      */
     public static final class PhysicalColumn extends Column {
 
-        private PhysicalColumn(String name, DataType<?> dataType) {
+        private PhysicalColumn(String name, SeaTunnelDataType<?> dataType) {
             this(name, dataType, null);
         }
 
-        private PhysicalColumn(String name, DataType<?> dataType, String 
comment) {
+        private PhysicalColumn(String name, SeaTunnelDataType<?> dataType, 
String comment) {
             super(name, dataType, comment);
         }
 
@@ -149,7 +149,7 @@ public abstract class Column {
         }
 
         @Override
-        public Column copy(DataType<?> newDataType) {
+        public Column copy(SeaTunnelDataType<?> newDataType) {
             return new PhysicalColumn(name, newDataType, comment);
         }
     }
@@ -162,13 +162,13 @@ public abstract class Column {
         private final String metadataKey;
 
         private MetadataColumn(
-                String name, DataType<?> dataType, String metadataKey) {
+            String name, SeaTunnelDataType<?> dataType, String metadataKey) {
             this(name, dataType, metadataKey, null);
         }
 
         private MetadataColumn(
                 String name,
-                DataType<?> dataType,
+                SeaTunnelDataType<?> dataType,
                 String metadataKey,
                 String comment) {
             super(name, dataType, comment);
@@ -193,7 +193,7 @@ public abstract class Column {
         }
 
         @Override
-        public Column copy(DataType<?> newDataType) {
+        public Column copy(SeaTunnelDataType<?> newDataType) {
             return new MetadataColumn(name, newDataType, metadataKey, comment);
         }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index 66188580..fa765cc0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.api.table.connector;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import java.util.List;
 import java.util.Map;
@@ -28,7 +28,7 @@ import java.util.Map;
  */
 public interface SupportReadingMetadata {
 
-    Map<String, DataType<?>> listReadableMetadata(CatalogTable catalogTable);
+    Map<String, SeaTunnelDataType<?>> listReadableMetadata(CatalogTable 
catalogTable);
 
-    void applyReadableMetadata(CatalogTable catalogTable, List<String> 
metadataKeys, DataType<?> dataType);
+    void applyReadableMetadata(CatalogTable catalogTable, List<String> 
metadataKeys, SeaTunnelDataType<?> dataType);
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 1e24cd25..e057c997 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -1,6 +1,6 @@
 package org.apache.seatunnel.api.table.type;
 
-public class ArrayType<T> implements DataType<T> {
+public class ArrayType<T> implements SeaTunnelDataType<T> {
 
     private final BasicType<T> elementType;
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index c2834f7e..adfdd517 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -22,7 +22,7 @@ import java.math.BigInteger;
 import java.time.Instant;
 import java.util.Date;
 
-public class BasicType<T> implements DataType<T> {
+public class BasicType<T> implements SeaTunnelDataType<T> {
 
     public static final BasicType<Boolean> BOOLEAN = new 
BasicType<>(Boolean.class);
     public static final BasicType<String> STRING = new 
BasicType<>(String.class);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
index 9b4f8517..3f549592 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
@@ -1,6 +1,6 @@
 package org.apache.seatunnel.api.table.type;
 
-public class EnumType<T extends Enum<T>> implements DataType<T> {
+public class EnumType<T extends Enum<T>> implements SeaTunnelDataType<T> {
     private final Class<T> enumClass;
 
     public EnumType(Class<T> enumClass) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 41c57004..b921e03e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -17,15 +17,15 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public class ListType<T> implements DataType<T> {
+public class ListType<T> implements SeaTunnelDataType<T> {
 
-    private final DataType<T> elementType;
+    private final SeaTunnelDataType<T> elementType;
 
-    public ListType(DataType<T> elementType) {
+    public ListType(SeaTunnelDataType<T> elementType) {
         this.elementType = elementType;
     }
 
-    public DataType<T> getElementType() {
+    public SeaTunnelDataType<T> getElementType() {
         return elementType;
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
index 6c8c24b7..6a58eb30 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
@@ -2,12 +2,12 @@ package org.apache.seatunnel.api.table.type;
 
 import java.util.Map;
 
-public class MapType<K, V> implements DataType<Map<K, V>> {
+public class MapType<K, V> implements SeaTunnelDataType<Map<K, V>> {
 
-    private final DataType<K> keyType;
-    private final DataType<V> valueType;
+    private final SeaTunnelDataType<K> keyType;
+    private final SeaTunnelDataType<V> valueType;
 
-    public MapType(DataType<K> keyType, DataType<V> valueType) {
+    public MapType(SeaTunnelDataType<K> keyType, SeaTunnelDataType<V> 
valueType) {
         if (keyType == null) {
             throw new IllegalArgumentException("keyType cannot be null");
         }
@@ -18,11 +18,11 @@ public class MapType<K, V> implements DataType<Map<K, V>> {
         this.valueType = valueType;
     }
 
-    public DataType<K> getKeyType() {
+    public SeaTunnelDataType<K> getKeyType() {
         return keyType;
     }
 
-    public DataType<V> getValueType() {
+    public SeaTunnelDataType<V> getValueType() {
         return valueType;
     }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index 2892d011..e6af968f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.api.table.type;
 
 import java.lang.reflect.Field;
 
-public class PojoType<T> implements DataType<T> {
+public class PojoType<T> implements SeaTunnelDataType<T> {
 
     private final Class<T> pojoClass;
     private final Field[] fields;
-    private final DataType<?>[] fieldTypes;
+    private final SeaTunnelDataType<?>[] fieldTypes;
 
-    public PojoType(Class<T> pojoClass, Field[] fields, DataType<?>[] 
fieldTypes) {
+    public PojoType(Class<T> pojoClass, Field[] fields, SeaTunnelDataType<?>[] 
fieldTypes) {
         this.pojoClass = pojoClass;
         this.fields = fields;
         this.fieldTypes = fieldTypes;
@@ -39,7 +39,7 @@ public class PojoType<T> implements DataType<T> {
         return fields;
     }
 
-    public DataType<?>[] getFieldTypes() {
+    public SeaTunnelDataType<?>[] getFieldTypes() {
         return fieldTypes;
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
similarity index 95%
rename from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
index 2183ad18..d25361f1 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.table.type;
 /**
  * Logic data type of column in SeaTunnel.
  */
-public interface DataType<T> {
+public interface SeaTunnelDataType<T> {
 
 
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
index 390ce453..b15d78fb 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
@@ -2,7 +2,7 @@ package org.apache.seatunnel.api.table.type;
 
 import java.sql.Timestamp;
 
-public class TimestampType implements DataType<Timestamp> {
+public class TimestampType implements SeaTunnelDataType<Timestamp> {
 
     private final int precision;
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
index ed8967f8..e379076c 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -85,7 +85,7 @@ public class BasicTypeConverter<T1>
             BasicType.BIG_INTEGER,
             BasicTypeInfo.BIG_INT_TYPE_INFO);
 
-    public static final BasicTypeConverter<BigDecimal> BIG_DECIMAL =
+    public static final BasicTypeConverter<BigDecimal> BIG_DECIMAL_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.BIG_DECIMAL,
             BasicTypeInfo.BIG_DEC_TYPE_INFO);
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index f5912f36..8e46e9e1 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -18,19 +18,19 @@
 package org.apache.seatunnel.translation.flink.types;
 
 import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
- * Convert SeaTunnel {@link DataType} to flink type.
+ * Convert SeaTunnel {@link SeaTunnelDataType} to flink type.
  */
 public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
 
     /**
-     * Convert SeaTunnel {@link DataType} to flink {@link  TypeInformation}.
+     * Convert SeaTunnel {@link SeaTunnelDataType} to flink {@link  
TypeInformation}.
      *
-     * @param seaTunnelDataType SeaTunnel {@link DataType}
+     * @param seaTunnelDataType SeaTunnel {@link SeaTunnelDataType}
      * @return flink {@link TypeInformation}
      */
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index ae155306..6341fcdb 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -19,11 +19,11 @@ package org.apache.seatunnel.translation.flink.utils;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DataType;
 import org.apache.seatunnel.api.table.type.EnumType;
 import org.apache.seatunnel.api.table.type.ListType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.TimestampType;
 import org.apache.seatunnel.translation.flink.types.ArrayTypeConverter;
 import org.apache.seatunnel.translation.flink.types.BasicTypeConverter;
@@ -43,8 +43,12 @@ import java.util.Date;
 
 public class TypeConverterUtils {
 
+    private TypeConverterUtils() {
+        throw new UnsupportedOperationException("TypeConverterUtils is a 
utility class and cannot be instantiated");
+    }
+
     @SuppressWarnings("unchecked")
-    public static <T1, T2> TypeInformation<T2> convertType(DataType<T1> 
dataType) {
+    public static <T1, T2> TypeInformation<T2> 
convertType(SeaTunnelDataType<T1> dataType) {
         if (dataType instanceof BasicType) {
             return (TypeInformation<T2>) convertBasicType((BasicType<T1>) 
dataType);
         }
@@ -132,7 +136,7 @@ public class TypeConverterUtils {
         if (physicalTypeClass == BigDecimal.class) {
             BasicType<BigDecimal> bigDecimalBasicType = 
(BasicType<BigDecimal>) basicType;
             return (TypeInformation<T>)
-                BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+                
BasicTypeConverter.BIG_DECIMAL_CONVERTER.convert(bigDecimalBasicType);
         }
         if (physicalTypeClass == Void.class) {
             BasicType<Void> voidBasicType = (BasicType<Void>) basicType;
@@ -148,7 +152,7 @@ public class TypeConverterUtils {
     }
 
     public static <T> ListTypeInfo<T> covertListType(ListType<T> listType) {
-        DataType<T> elementType = listType.getElementType();
+        SeaTunnelDataType<T> elementType = listType.getElementType();
         return new ListTypeInfo<>(convertType(elementType));
     }
 
@@ -158,8 +162,8 @@ public class TypeConverterUtils {
     }
 
     public static <K, V> MapTypeInfo<K, V> convertMapType(MapType<K, V> 
mapType) {
-        DataType<K> keyType = mapType.getKeyType();
-        DataType<V> valueType = mapType.getValueType();
+        SeaTunnelDataType<K> keyType = mapType.getKeyType();
+        SeaTunnelDataType<V> valueType = mapType.getValueType();
         return new MapTypeInfo<>(convertType(keyType), convertType(valueType));
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
index 2d7b1448..165196e9 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
@@ -123,7 +123,7 @@ public class BasicTypeConverterTest {
     public void convertBigDecimalType() {
         BasicType<BigDecimal> bigDecimalBasicType = BasicType.BIG_DECIMAL;
         TypeInformation<BigDecimal> bigDecimalTypeInformation =
-            BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+            
BasicTypeConverter.BIG_DECIMAL_CONVERTER.convert(bigDecimalBasicType);
         Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
bigDecimalTypeInformation);
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
index 778011c9..33caaf29 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
@@ -18,9 +18,9 @@
 package org.apache.seatunnel.translation.flink.types;
 
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DataType;
 import org.apache.seatunnel.api.table.type.ListType;
 import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -36,7 +36,7 @@ public class PojoTypeConverterTest {
     public void convert() {
         PojoTypeConverter<MockPojo> pojoTypeConverter = new 
PojoTypeConverter<>();
         Field[] fields = MockPojo.class.getDeclaredFields();
-        DataType<?>[] fieldTypes = {BasicType.STRING, new 
ListType<>(BasicType.INTEGER)};
+        SeaTunnelDataType<?>[] fieldTypes = {BasicType.STRING, new 
ListType<>(BasicType.INTEGER)};
         PojoTypeInfo<MockPojo> pojoTypeInfo =
             pojoTypeConverter.convert(new PojoType<>(MockPojo.class, fields, 
fieldTypes));
         Assert.assertEquals(
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml 
b/seatunnel-translation/seatunnel-translation-spark/pom.xml
index 806c0df6..948729d0 100644
--- a/seatunnel-translation/seatunnel-translation-spark/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -34,5 +34,11 @@
             <artifactId>seatunnel-translation-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <!-- apache spark table -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
similarity index 50%
copy from 
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
copy to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
index f5912f36..5eb635c6 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
@@ -15,25 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.flink.types;
+package org.apache.seatunnel.translation.spark.serialization;
 
-import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
 
-/**
- * Convert SeaTunnel {@link DataType} to flink type.
- */
-public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
+import java.io.IOException;
+
+public class SparkRowSerialization implements RowSerialization<Row> {
 
-    /**
-     * Convert SeaTunnel {@link DataType} to flink {@link  TypeInformation}.
-     *
-     * @param seaTunnelDataType SeaTunnel {@link DataType}
-     * @return flink {@link TypeInformation}
-     */
     @Override
-    T2 convert(T1 seaTunnelDataType);
+    public Row serialize(SeaTunnelRow seaTunnelRow) throws IOException {
+        return RowFactory.create(seaTunnelRow.getFields());
+    }
 
+    @Override
+    public SeaTunnelRow deserialize(Row engineRow) throws IOException {
+        Object[] fields = new Object[engineRow.length()];
+        for (int i = 0; i < engineRow.length(); i++) {
+            fields[i] = engineRow.get(i);
+        }
+        return new SeaTunnelRow(fields);
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
similarity index 56%
copy from 
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
copy to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
index f5912f36..71f0820d 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
@@ -15,25 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.flink.types;
+package org.apache.seatunnel.translation.spark.types;
 
-import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
 
-/**
- * Convert SeaTunnel {@link DataType} to flink type.
- */
-public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
+public class ArrayTypeConverter<T1>
+    implements SparkDataTypeConverter<ArrayType<T1>, 
org.apache.spark.sql.types.ArrayType> {
 
-    /**
-     * Convert SeaTunnel {@link DataType} to flink {@link  TypeInformation}.
-     *
-     * @param seaTunnelDataType SeaTunnel {@link DataType}
-     * @return flink {@link TypeInformation}
-     */
     @Override
-    T2 convert(T1 seaTunnelDataType);
-
+    public org.apache.spark.sql.types.ArrayType convert(ArrayType<T1> 
seaTunnelDataType) {
+        DataType elementType = 
TypeConverterUtils.convert(seaTunnelDataType.getElementType());
+        return DataTypes.createArrayType(elementType);
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
new file mode 100644
index 00000000..19615094
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
@@ -0,0 +1,112 @@
+package org.apache.seatunnel.translation.spark.types;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.Date;
+
+public class BasicTypeConverter<T1>
+    implements SparkDataTypeConverter<BasicType<T1>, DataType> {
+
+    public static final BasicTypeConverter<String> STRING_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.STRING,
+            DataTypes.StringType
+        );
+
+    public static final BasicTypeConverter<Integer> INTEGER_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.INTEGER,
+            DataTypes.IntegerType
+        );
+
+    public static final BasicTypeConverter<Boolean> BOOLEAN_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BOOLEAN,
+            DataTypes.BooleanType);
+
+    public static final BasicTypeConverter<Date> DATE_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.DATE,
+            DataTypes.DateType
+        );
+
+    public static final BasicTypeConverter<Double> DOUBLE_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.DOUBLE,
+            DataTypes.DoubleType
+        );
+
+    public static final BasicTypeConverter<Long> LONG_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.LONG,
+            DataTypes.LongType
+        );
+
+    public static final BasicTypeConverter<Float> FLOAT_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.FLOAT,
+            DataTypes.FloatType);
+
+    public static final BasicTypeConverter<Byte> BYTE_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BYTE,
+            DataTypes.ByteType
+        );
+
+    public static final BasicTypeConverter<Short> SHORT_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.SHORT,
+            DataTypes.ShortType);
+
+    // todo: need to confirm
+    public static final BasicTypeConverter<Character> CHARACTER_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.CHARACTER,
+            new CharType(1)
+        );
+
+    public static final BasicTypeConverter<BigInteger> BIG_INTEGER_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BIG_INTEGER,
+            DataTypes.LongType
+        );
+
+    public static final BasicTypeConverter<BigDecimal> BID_DECIMAL_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BIG_DECIMAL,
+            new DecimalType()
+        );
+
+    public static final BasicTypeConverter<Instant> INSTANT_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.INSTANT,
+            DataTypes.TimestampType
+        );
+
+    public static final BasicTypeConverter<Void> NULL_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.NULL,
+            DataTypes.NullType
+        );
+
+    private final BasicType<T1> seatunnelDataType;
+    private final DataType sparkDataType;
+
+    public BasicTypeConverter(BasicType<T1> seatunnelDataType, DataType 
sparkDataType) {
+        this.seatunnelDataType = seatunnelDataType;
+        this.sparkDataType = sparkDataType;
+    }
+
+    @Override
+    public DataType convert(BasicType<T1> seaTunnelDataType) {
+        return sparkDataType;
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
similarity index 67%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
copy to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
index 41c57004..4c757151 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
@@ -15,17 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.translation.spark.types;
 
-public class ListType<T> implements DataType<T> {
+import org.apache.seatunnel.api.table.type.PojoType;
 
-    private final DataType<T> elementType;
+import org.apache.spark.sql.types.ObjectType;
 
-    public ListType(DataType<T> elementType) {
-        this.elementType = elementType;
-    }
-
-    public DataType<T> getElementType() {
-        return elementType;
+public class PojoTypeConverter<T1> implements 
SparkDataTypeConverter<PojoType<T1>, ObjectType> {
+    @Override
+    public ObjectType convert(PojoType<T1> seaTunnelDataType) {
+        return new ObjectType(seaTunnelDataType.getPojoClass());
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
new file mode 100644
index 00000000..c7e0018b
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
@@ -0,0 +1,18 @@
+package org.apache.seatunnel.translation.spark.types;
+
+import org.apache.seatunnel.api.table.type.Converter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import org.apache.spark.sql.types.DataType;
+
+public interface SparkDataTypeConverter<T1, T2> extends Converter<T1, T2> {
+
+    /**
+     * Convert SeaTunnel {@link SeaTunnelDataType} to flink {@link DataType}.
+     *
+     * @param seaTunnelDataType SeaTunnel {@link SeaTunnelDataType}
+     * @return flink {@link DataType}
+     */
+    @Override
+    T2 convert(T1 seaTunnelDataType);
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
similarity index 57%
copy from 
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
copy to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
index f5912f36..fd6540af 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
@@ -15,25 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.flink.types;
+package org.apache.seatunnel.translation.spark.types;
 
-import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.TimestampType;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.spark.sql.types.DataTypes;
 
-/**
- * Convert SeaTunnel {@link DataType} to flink type.
- */
-public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
-
-    /**
-     * Convert SeaTunnel {@link DataType} to flink {@link  TypeInformation}.
-     *
-     * @param seaTunnelDataType SeaTunnel {@link DataType}
-     * @return flink {@link TypeInformation}
-     */
-    @Override
-    T2 convert(T1 seaTunnelDataType);
+public class TimestampTypeConverter
+    implements SparkDataTypeConverter<TimestampType, 
org.apache.spark.sql.types.TimestampType> {
+
+    public static final TimestampTypeConverter INSTANCE = new 
TimestampTypeConverter();
 
+    private TimestampTypeConverter() {
+
+    }
+
+    @Override
+    public org.apache.spark.sql.types.TimestampType convert(TimestampType 
seaTunnelDataType) {
+        return (org.apache.spark.sql.types.TimestampType) 
DataTypes.TimestampType;
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
new file mode 100644
index 00000000..932cd22a
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.utils;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.TimestampType;
+import org.apache.seatunnel.translation.spark.types.ArrayTypeConverter;
+import org.apache.seatunnel.translation.spark.types.BasicTypeConverter;
+import org.apache.seatunnel.translation.spark.types.PojoTypeConverter;
+import org.apache.seatunnel.translation.spark.types.TimestampTypeConverter;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.ObjectType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+
+public class TypeConverterUtils {
+
+    private TypeConverterUtils() {
+        throw new UnsupportedOperationException("TypeConverterUtils is a 
utility class and cannot be instantiated");
+    }
+
+    public static <T> DataType convert(SeaTunnelDataType<T> seaTunnelDataType) 
{
+        if (seaTunnelDataType instanceof BasicType) {
+            convertBasicType((BasicType<T>) seaTunnelDataType);
+        }
+        if (seaTunnelDataType instanceof TimestampType) {
+            return TimestampTypeConverter.INSTANCE.convert((TimestampType) 
seaTunnelDataType);
+        }
+        if (seaTunnelDataType instanceof ArrayType) {
+            return convertArrayType((ArrayType<T>) seaTunnelDataType);
+        }
+        if (seaTunnelDataType instanceof PojoType) {
+            return convertPojoType((PojoType<T>) seaTunnelDataType);
+        }
+
+        throw new IllegalArgumentException("Unsupported data type: " + 
seaTunnelDataType);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> DataType convertBasicType(BasicType<T> basicType) {
+        Class<T> physicalTypeClass = basicType.getPhysicalTypeClass();
+        if (physicalTypeClass == Boolean.class) {
+            BasicType<Boolean> booleanBasicType = (BasicType<Boolean>) 
basicType;
+            return 
BasicTypeConverter.BOOLEAN_CONVERTER.convert(booleanBasicType);
+        }
+        if (physicalTypeClass == String.class) {
+            BasicType<String> stringBasicType = (BasicType<String>) basicType;
+            return 
BasicTypeConverter.STRING_CONVERTER.convert(stringBasicType);
+        }
+        if (physicalTypeClass == Date.class) {
+            BasicType<Date> dateBasicType = (BasicType<Date>) basicType;
+            return BasicTypeConverter.DATE_CONVERTER.convert(dateBasicType);
+        }
+        if (physicalTypeClass == Double.class) {
+            BasicType<Double> doubleBasicType = (BasicType<Double>) basicType;
+            return 
BasicTypeConverter.DOUBLE_CONVERTER.convert(doubleBasicType);
+        }
+        if (physicalTypeClass == Integer.class) {
+            BasicType<Integer> integerBasicType = (BasicType<Integer>) 
basicType;
+            return 
BasicTypeConverter.INTEGER_CONVERTER.convert(integerBasicType);
+        }
+        if (physicalTypeClass == Long.class) {
+            BasicType<Long> longBasicType = (BasicType<Long>) basicType;
+            return BasicTypeConverter.LONG_CONVERTER.convert(longBasicType);
+        }
+        if (physicalTypeClass == Float.class) {
+            BasicType<Float> floatBasicType = (BasicType<Float>) basicType;
+            return BasicTypeConverter.FLOAT_CONVERTER.convert(floatBasicType);
+        }
+        if (physicalTypeClass == Byte.class) {
+            BasicType<Byte> byteBasicType = (BasicType<Byte>) basicType;
+            return BasicTypeConverter.BYTE_CONVERTER.convert(byteBasicType);
+        }
+        if (physicalTypeClass == Short.class) {
+            BasicType<Short> shortBasicType = (BasicType<Short>) basicType;
+            return BasicTypeConverter.SHORT_CONVERTER.convert(shortBasicType);
+        }
+        if (physicalTypeClass == Character.class) {
+            BasicType<Character> characterBasicType = (BasicType<Character>) 
basicType;
+            return 
BasicTypeConverter.CHARACTER_CONVERTER.convert(characterBasicType);
+        }
+        if (physicalTypeClass == BigInteger.class) {
+            BasicType<BigInteger> bigIntegerBasicType = 
(BasicType<BigInteger>) basicType;
+            return 
BasicTypeConverter.BIG_INTEGER_CONVERTER.convert(bigIntegerBasicType);
+        }
+        if (physicalTypeClass == BigDecimal.class) {
+            BasicType<BigDecimal> bigDecimalBasicType = 
(BasicType<BigDecimal>) basicType;
+            return 
BasicTypeConverter.BID_DECIMAL_CONVERTER.convert(bigDecimalBasicType);
+        }
+        if (physicalTypeClass == Void.class) {
+            BasicType<Void> voidBasicType = (BasicType<Void>) basicType;
+            return BasicTypeConverter.NULL_CONVERTER.convert(voidBasicType);
+        }
+        throw new IllegalArgumentException("Unsupported basic type: " + 
basicType);
+    }
+
+    public static <T1> org.apache.spark.sql.types.ArrayType 
convertArrayType(ArrayType<T1> arrayType) {
+        ArrayTypeConverter<T1> arrayTypeConverter = new ArrayTypeConverter<>();
+        return arrayTypeConverter.convert(arrayType);
+    }
+
+    public static <T> ObjectType convertPojoType(PojoType<T> pojoType) {
+        PojoTypeConverter<T> pojoTypeConverter = new PojoTypeConverter<>();
+        return pojoTypeConverter.convert(pojoType);
+    }
+}

Reply via email to