This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 8f710f76 Add spark basic type converter (#1837)
8f710f76 is described below
commit 8f710f76a9a1ddd2793adecaf5611aec27d86666
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue May 10 08:28:42 2022 +0800
Add spark basic type converter (#1837)
---
.../apache/seatunnel/api/table/catalog/Column.java | 28 ++---
.../table/connector/SupportReadingMetadata.java | 6 +-
.../apache/seatunnel/api/table/type/ArrayType.java | 2 +-
.../apache/seatunnel/api/table/type/BasicType.java | 2 +-
.../apache/seatunnel/api/table/type/EnumType.java | 2 +-
.../apache/seatunnel/api/table/type/ListType.java | 8 +-
.../apache/seatunnel/api/table/type/MapType.java | 12 +-
.../apache/seatunnel/api/table/type/PojoType.java | 8 +-
.../type/{DataType.java => SeaTunnelDataType.java} | 2 +-
.../seatunnel/api/table/type/TimestampType.java | 2 +-
.../flink/types/BasicTypeConverter.java | 2 +-
.../flink/types/FlinkTypeConverter.java | 8 +-
.../flink/utils/TypeConverterUtils.java | 16 ++-
.../flink/types/BasicTypeConverterTest.java | 2 +-
.../flink/types/PojoTypeConverterTest.java | 4 +-
.../seatunnel-translation-spark/pom.xml | 6 +
.../serialization/SparkRowSerialization.java} | 34 +++---
.../spark/types/ArrayTypeConverter.java} | 27 ++---
.../spark/types/BasicTypeConverter.java | 112 ++++++++++++++++++
.../translation/spark/types/PojoTypeConverter.java | 16 ++-
.../spark/types/SparkDataTypeConverter.java | 18 +++
.../spark/types/TimestampTypeConverter.java} | 32 +++---
.../spark/utils/TypeConverterUtils.java | 127 +++++++++++++++++++++
23 files changed, 369 insertions(+), 107 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 39304916..c1f5ed78 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.api.table.catalog;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import java.util.Objects;
import java.util.Optional;
@@ -34,11 +34,11 @@ public abstract class Column {
/**
* Data type of the column.
*/
- protected final DataType<?> dataType;
+ protected final SeaTunnelDataType<?> dataType;
protected final String comment;
- private Column(String name, DataType<?> dataType, String comment) {
+ private Column(String name, SeaTunnelDataType<?> dataType, String comment)
{
this.name = name;
this.dataType = dataType;
this.comment = comment;
@@ -47,7 +47,7 @@ public abstract class Column {
/**
* Creates a regular table column that represents physical data.
*/
- public static PhysicalColumn physical(String name, DataType<?> dataType) {
+ public static PhysicalColumn physical(String name, SeaTunnelDataType<?>
dataType) {
return new PhysicalColumn(name, dataType);
}
@@ -58,7 +58,7 @@ public abstract class Column {
* <p>Allows to specify whether the column is virtual or not.
*/
public static MetadataColumn metadata(
- String name, DataType<?> dataType, String metadataKey) {
+ String name, SeaTunnelDataType<?> dataType, String metadataKey) {
return new MetadataColumn(name, dataType, metadataKey);
}
@@ -76,7 +76,7 @@ public abstract class Column {
/**
* Returns the data type of this column.
*/
- public DataType<?> getDataType() {
+ public SeaTunnelDataType<?> getDataType() {
return this.dataType;
}
@@ -95,9 +95,9 @@ public abstract class Column {
}
/**
- * Returns a copy of the column with a replaced {@link DataType}.
+ * Returns a copy of the column with a replaced {@link SeaTunnelDataType}.
*/
- public abstract Column copy(DataType<?> newType);
+ public abstract Column copy(SeaTunnelDataType<?> newType);
@Override
public boolean equals(Object o) {
@@ -127,11 +127,11 @@ public abstract class Column {
*/
public static final class PhysicalColumn extends Column {
- private PhysicalColumn(String name, DataType<?> dataType) {
+ private PhysicalColumn(String name, SeaTunnelDataType<?> dataType) {
this(name, dataType, null);
}
- private PhysicalColumn(String name, DataType<?> dataType, String
comment) {
+ private PhysicalColumn(String name, SeaTunnelDataType<?> dataType,
String comment) {
super(name, dataType, comment);
}
@@ -149,7 +149,7 @@ public abstract class Column {
}
@Override
- public Column copy(DataType<?> newDataType) {
+ public Column copy(SeaTunnelDataType<?> newDataType) {
return new PhysicalColumn(name, newDataType, comment);
}
}
@@ -162,13 +162,13 @@ public abstract class Column {
private final String metadataKey;
private MetadataColumn(
- String name, DataType<?> dataType, String metadataKey) {
+ String name, SeaTunnelDataType<?> dataType, String metadataKey) {
this(name, dataType, metadataKey, null);
}
private MetadataColumn(
String name,
- DataType<?> dataType,
+ SeaTunnelDataType<?> dataType,
String metadataKey,
String comment) {
super(name, dataType, comment);
@@ -193,7 +193,7 @@ public abstract class Column {
}
@Override
- public Column copy(DataType<?> newDataType) {
+ public Column copy(SeaTunnelDataType<?> newDataType) {
return new MetadataColumn(name, newDataType, metadataKey, comment);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index 66188580..fa765cc0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.api.table.connector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import java.util.List;
import java.util.Map;
@@ -28,7 +28,7 @@ import java.util.Map;
*/
public interface SupportReadingMetadata {
- Map<String, DataType<?>> listReadableMetadata(CatalogTable catalogTable);
+ Map<String, SeaTunnelDataType<?>> listReadableMetadata(CatalogTable
catalogTable);
- void applyReadableMetadata(CatalogTable catalogTable, List<String>
metadataKeys, DataType<?> dataType);
+ void applyReadableMetadata(CatalogTable catalogTable, List<String>
metadataKeys, SeaTunnelDataType<?> dataType);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 1e24cd25..e057c997 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -1,6 +1,6 @@
package org.apache.seatunnel.api.table.type;
-public class ArrayType<T> implements DataType<T> {
+public class ArrayType<T> implements SeaTunnelDataType<T> {
private final BasicType<T> elementType;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index c2834f7e..adfdd517 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -22,7 +22,7 @@ import java.math.BigInteger;
import java.time.Instant;
import java.util.Date;
-public class BasicType<T> implements DataType<T> {
+public class BasicType<T> implements SeaTunnelDataType<T> {
public static final BasicType<Boolean> BOOLEAN = new
BasicType<>(Boolean.class);
public static final BasicType<String> STRING = new
BasicType<>(String.class);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
index 9b4f8517..3f549592 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
@@ -1,6 +1,6 @@
package org.apache.seatunnel.api.table.type;
-public class EnumType<T extends Enum<T>> implements DataType<T> {
+public class EnumType<T extends Enum<T>> implements SeaTunnelDataType<T> {
private final Class<T> enumClass;
public EnumType(Class<T> enumClass) {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 41c57004..b921e03e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -17,15 +17,15 @@
package org.apache.seatunnel.api.table.type;
-public class ListType<T> implements DataType<T> {
+public class ListType<T> implements SeaTunnelDataType<T> {
- private final DataType<T> elementType;
+ private final SeaTunnelDataType<T> elementType;
- public ListType(DataType<T> elementType) {
+ public ListType(SeaTunnelDataType<T> elementType) {
this.elementType = elementType;
}
- public DataType<T> getElementType() {
+ public SeaTunnelDataType<T> getElementType() {
return elementType;
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
index 6c8c24b7..6a58eb30 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
@@ -2,12 +2,12 @@ package org.apache.seatunnel.api.table.type;
import java.util.Map;
-public class MapType<K, V> implements DataType<Map<K, V>> {
+public class MapType<K, V> implements SeaTunnelDataType<Map<K, V>> {
- private final DataType<K> keyType;
- private final DataType<V> valueType;
+ private final SeaTunnelDataType<K> keyType;
+ private final SeaTunnelDataType<V> valueType;
- public MapType(DataType<K> keyType, DataType<V> valueType) {
+ public MapType(SeaTunnelDataType<K> keyType, SeaTunnelDataType<V>
valueType) {
if (keyType == null) {
throw new IllegalArgumentException("keyType cannot be null");
}
@@ -18,11 +18,11 @@ public class MapType<K, V> implements DataType<Map<K, V>> {
this.valueType = valueType;
}
- public DataType<K> getKeyType() {
+ public SeaTunnelDataType<K> getKeyType() {
return keyType;
}
- public DataType<V> getValueType() {
+ public SeaTunnelDataType<V> getValueType() {
return valueType;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index 2892d011..e6af968f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.api.table.type;
import java.lang.reflect.Field;
-public class PojoType<T> implements DataType<T> {
+public class PojoType<T> implements SeaTunnelDataType<T> {
private final Class<T> pojoClass;
private final Field[] fields;
- private final DataType<?>[] fieldTypes;
+ private final SeaTunnelDataType<?>[] fieldTypes;
- public PojoType(Class<T> pojoClass, Field[] fields, DataType<?>[]
fieldTypes) {
+ public PojoType(Class<T> pojoClass, Field[] fields, SeaTunnelDataType<?>[]
fieldTypes) {
this.pojoClass = pojoClass;
this.fields = fields;
this.fieldTypes = fieldTypes;
@@ -39,7 +39,7 @@ public class PojoType<T> implements DataType<T> {
return fields;
}
- public DataType<?>[] getFieldTypes() {
+ public SeaTunnelDataType<?>[] getFieldTypes() {
return fieldTypes;
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
similarity index 95%
rename from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
index 2183ad18..d25361f1 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.table.type;
/**
* Logic data type of column in SeaTunnel.
*/
-public interface DataType<T> {
+public interface SeaTunnelDataType<T> {
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
index 390ce453..b15d78fb 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
@@ -2,7 +2,7 @@ package org.apache.seatunnel.api.table.type;
import java.sql.Timestamp;
-public class TimestampType implements DataType<Timestamp> {
+public class TimestampType implements SeaTunnelDataType<Timestamp> {
private final int precision;
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
index ed8967f8..e379076c 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -85,7 +85,7 @@ public class BasicTypeConverter<T1>
BasicType.BIG_INTEGER,
BasicTypeInfo.BIG_INT_TYPE_INFO);
- public static final BasicTypeConverter<BigDecimal> BIG_DECIMAL =
+ public static final BasicTypeConverter<BigDecimal> BIG_DECIMAL_CONVERTER =
new BasicTypeConverter<>(
BasicType.BIG_DECIMAL,
BasicTypeInfo.BIG_DEC_TYPE_INFO);
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index f5912f36..8e46e9e1 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -18,19 +18,19 @@
package org.apache.seatunnel.translation.flink.types;
import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
- * Convert SeaTunnel {@link DataType} to flink type.
+ * Convert SeaTunnel {@link SeaTunnelDataType} to flink type.
*/
public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
/**
- * Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}.
+ * Convert SeaTunnel {@link SeaTunnelDataType} to flink {@link
TypeInformation}.
*
- * @param seaTunnelDataType SeaTunnel {@link DataType}
+ * @param seaTunnelDataType SeaTunnel {@link SeaTunnelDataType}
* @return flink {@link TypeInformation}
*/
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index ae155306..6341fcdb 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -19,11 +19,11 @@ package org.apache.seatunnel.translation.flink.utils;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DataType;
import org.apache.seatunnel.api.table.type.EnumType;
import org.apache.seatunnel.api.table.type.ListType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.TimestampType;
import org.apache.seatunnel.translation.flink.types.ArrayTypeConverter;
import org.apache.seatunnel.translation.flink.types.BasicTypeConverter;
@@ -43,8 +43,12 @@ import java.util.Date;
public class TypeConverterUtils {
+ private TypeConverterUtils() {
+ throw new UnsupportedOperationException("TypeConverterUtils is a
utility class and cannot be instantiated");
+ }
+
@SuppressWarnings("unchecked")
- public static <T1, T2> TypeInformation<T2> convertType(DataType<T1>
dataType) {
+ public static <T1, T2> TypeInformation<T2>
convertType(SeaTunnelDataType<T1> dataType) {
if (dataType instanceof BasicType) {
return (TypeInformation<T2>) convertBasicType((BasicType<T1>)
dataType);
}
@@ -132,7 +136,7 @@ public class TypeConverterUtils {
if (physicalTypeClass == BigDecimal.class) {
BasicType<BigDecimal> bigDecimalBasicType =
(BasicType<BigDecimal>) basicType;
return (TypeInformation<T>)
- BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+
BasicTypeConverter.BIG_DECIMAL_CONVERTER.convert(bigDecimalBasicType);
}
if (physicalTypeClass == Void.class) {
BasicType<Void> voidBasicType = (BasicType<Void>) basicType;
@@ -148,7 +152,7 @@ public class TypeConverterUtils {
}
public static <T> ListTypeInfo<T> covertListType(ListType<T> listType) {
- DataType<T> elementType = listType.getElementType();
+ SeaTunnelDataType<T> elementType = listType.getElementType();
return new ListTypeInfo<>(convertType(elementType));
}
@@ -158,8 +162,8 @@ public class TypeConverterUtils {
}
public static <K, V> MapTypeInfo<K, V> convertMapType(MapType<K, V>
mapType) {
- DataType<K> keyType = mapType.getKeyType();
- DataType<V> valueType = mapType.getValueType();
+ SeaTunnelDataType<K> keyType = mapType.getKeyType();
+ SeaTunnelDataType<V> valueType = mapType.getValueType();
return new MapTypeInfo<>(convertType(keyType), convertType(valueType));
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
index 2d7b1448..165196e9 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
@@ -123,7 +123,7 @@ public class BasicTypeConverterTest {
public void convertBigDecimalType() {
BasicType<BigDecimal> bigDecimalBasicType = BasicType.BIG_DECIMAL;
TypeInformation<BigDecimal> bigDecimalTypeInformation =
- BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+
BasicTypeConverter.BIG_DECIMAL_CONVERTER.convert(bigDecimalBasicType);
Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO,
bigDecimalTypeInformation);
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
index 778011c9..33caaf29 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
@@ -18,9 +18,9 @@
package org.apache.seatunnel.translation.flink.types;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DataType;
import org.apache.seatunnel.api.table.type.ListType;
import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -36,7 +36,7 @@ public class PojoTypeConverterTest {
public void convert() {
PojoTypeConverter<MockPojo> pojoTypeConverter = new
PojoTypeConverter<>();
Field[] fields = MockPojo.class.getDeclaredFields();
- DataType<?>[] fieldTypes = {BasicType.STRING, new
ListType<>(BasicType.INTEGER)};
+ SeaTunnelDataType<?>[] fieldTypes = {BasicType.STRING, new
ListType<>(BasicType.INTEGER)};
PojoTypeInfo<MockPojo> pojoTypeInfo =
pojoTypeConverter.convert(new PojoType<>(MockPojo.class, fields,
fieldTypes));
Assert.assertEquals(
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml
b/seatunnel-translation/seatunnel-translation-spark/pom.xml
index 806c0df6..948729d0 100644
--- a/seatunnel-translation/seatunnel-translation-spark/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -34,5 +34,11 @@
<artifactId>seatunnel-translation-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <!-- apache spark table -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
similarity index 50%
copy from
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
copy to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
index f5912f36..5eb635c6 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
@@ -15,25 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.flink.types;
+package org.apache.seatunnel.translation.spark.serialization;
-import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
-/**
- * Convert SeaTunnel {@link DataType} to flink type.
- */
-public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
+import java.io.IOException;
+
+public class SparkRowSerialization implements RowSerialization<Row> {
- /**
- * Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}.
- *
- * @param seaTunnelDataType SeaTunnel {@link DataType}
- * @return flink {@link TypeInformation}
- */
@Override
- T2 convert(T1 seaTunnelDataType);
+ public Row serialize(SeaTunnelRow seaTunnelRow) throws IOException {
+ return RowFactory.create(seaTunnelRow.getFields());
+ }
+ @Override
+ public SeaTunnelRow deserialize(Row engineRow) throws IOException {
+ Object[] fields = new Object[engineRow.length()];
+ for (int i = 0; i < engineRow.length(); i++) {
+ fields[i] = engineRow.get(i);
+ }
+ return new SeaTunnelRow(fields);
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
similarity index 56%
copy from
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
copy to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
index f5912f36..71f0820d 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
@@ -15,25 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.flink.types;
+package org.apache.seatunnel.translation.spark.types;
-import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
-/**
- * Convert SeaTunnel {@link DataType} to flink type.
- */
-public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
+public class ArrayTypeConverter<T1>
+ implements SparkDataTypeConverter<ArrayType<T1>,
org.apache.spark.sql.types.ArrayType> {
- /**
- * Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}.
- *
- * @param seaTunnelDataType SeaTunnel {@link DataType}
- * @return flink {@link TypeInformation}
- */
@Override
- T2 convert(T1 seaTunnelDataType);
-
+ public org.apache.spark.sql.types.ArrayType convert(ArrayType<T1>
seaTunnelDataType) {
+ DataType elementType =
TypeConverterUtils.convert(seaTunnelDataType.getElementType());
+ return DataTypes.createArrayType(elementType);
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
new file mode 100644
index 00000000..19615094
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
@@ -0,0 +1,112 @@
+package org.apache.seatunnel.translation.spark.types;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.Date;
+
+public class BasicTypeConverter<T1>
+ implements SparkDataTypeConverter<BasicType<T1>, DataType> {
+
+ public static final BasicTypeConverter<String> STRING_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.STRING,
+ DataTypes.StringType
+ );
+
+ public static final BasicTypeConverter<Integer> INTEGER_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.INTEGER,
+ DataTypes.IntegerType
+ );
+
+ public static final BasicTypeConverter<Boolean> BOOLEAN_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BOOLEAN,
+ DataTypes.BooleanType);
+
+ public static final BasicTypeConverter<Date> DATE_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.DATE,
+ DataTypes.DateType
+ );
+
+ public static final BasicTypeConverter<Double> DOUBLE_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.DOUBLE,
+ DataTypes.DoubleType
+ );
+
+ public static final BasicTypeConverter<Long> LONG_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.LONG,
+ DataTypes.LongType
+ );
+
+ public static final BasicTypeConverter<Float> FLOAT_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.FLOAT,
+ DataTypes.FloatType);
+
+ public static final BasicTypeConverter<Byte> BYTE_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BYTE,
+ DataTypes.ByteType
+ );
+
+ public static final BasicTypeConverter<Short> SHORT_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.SHORT,
+ DataTypes.ShortType);
+
+ // todo: need to confirm
+ public static final BasicTypeConverter<Character> CHARACTER_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.CHARACTER,
+ new CharType(1)
+ );
+
+ public static final BasicTypeConverter<BigInteger> BIG_INTEGER_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BIG_INTEGER,
+ DataTypes.LongType
+ );
+
+ public static final BasicTypeConverter<BigDecimal> BID_DECIMAL_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BIG_DECIMAL,
+ new DecimalType()
+ );
+
+ public static final BasicTypeConverter<Instant> INSTANT_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.INSTANT,
+ DataTypes.TimestampType
+ );
+
+ public static final BasicTypeConverter<Void> NULL_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.NULL,
+ DataTypes.NullType
+ );
+
+ private final BasicType<T1> seatunnelDataType;
+ private final DataType sparkDataType;
+
+ public BasicTypeConverter(BasicType<T1> seatunnelDataType, DataType
sparkDataType) {
+ this.seatunnelDataType = seatunnelDataType;
+ this.sparkDataType = sparkDataType;
+ }
+
+ @Override
+ public DataType convert(BasicType<T1> seaTunnelDataType) {
+ return sparkDataType;
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
similarity index 67%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
copy to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
index 41c57004..4c757151 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.translation.spark.types;
-public class ListType<T> implements DataType<T> {
+import org.apache.seatunnel.api.table.type.PojoType;
- private final DataType<T> elementType;
+import org.apache.spark.sql.types.ObjectType;
- public ListType(DataType<T> elementType) {
- this.elementType = elementType;
- }
-
- public DataType<T> getElementType() {
- return elementType;
+public class PojoTypeConverter<T1> implements
SparkDataTypeConverter<PojoType<T1>, ObjectType> {
+ @Override
+ public ObjectType convert(PojoType<T1> seaTunnelDataType) {
+ return new ObjectType(seaTunnelDataType.getPojoClass());
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
new file mode 100644
index 00000000..c7e0018b
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
@@ -0,0 +1,18 @@
+package org.apache.seatunnel.translation.spark.types;
+
+import org.apache.seatunnel.api.table.type.Converter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import org.apache.spark.sql.types.DataType;
+
+public interface SparkDataTypeConverter<T1, T2> extends Converter<T1, T2> {
+
+ /**
+ * Convert SeaTunnel {@link SeaTunnelDataType} to flink {@link DataType}.
+ *
+ * @param seaTunnelDataType SeaTunnel {@link SeaTunnelDataType}
+ * @return flink {@link DataType}
+ */
+ @Override
+ T2 convert(T1 seaTunnelDataType);
+}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
similarity index 57%
copy from
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
copy to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
index f5912f36..fd6540af 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
@@ -15,25 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.flink.types;
+package org.apache.seatunnel.translation.spark.types;
-import org.apache.seatunnel.api.table.type.Converter;
-import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.TimestampType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.spark.sql.types.DataTypes;
-/**
- * Convert SeaTunnel {@link DataType} to flink type.
- */
-public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
-
- /**
- * Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}.
- *
- * @param seaTunnelDataType SeaTunnel {@link DataType}
- * @return flink {@link TypeInformation}
- */
- @Override
- T2 convert(T1 seaTunnelDataType);
+public class TimestampTypeConverter
+ implements SparkDataTypeConverter<TimestampType,
org.apache.spark.sql.types.TimestampType> {
+
+ public static final TimestampTypeConverter INSTANCE = new
TimestampTypeConverter();
+ private TimestampTypeConverter() {
+
+ }
+
+ @Override
+ public org.apache.spark.sql.types.TimestampType convert(TimestampType
seaTunnelDataType) {
+ return (org.apache.spark.sql.types.TimestampType)
DataTypes.TimestampType;
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
new file mode 100644
index 00000000..932cd22a
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.utils;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.TimestampType;
+import org.apache.seatunnel.translation.spark.types.ArrayTypeConverter;
+import org.apache.seatunnel.translation.spark.types.BasicTypeConverter;
+import org.apache.seatunnel.translation.spark.types.PojoTypeConverter;
+import org.apache.seatunnel.translation.spark.types.TimestampTypeConverter;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.ObjectType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+
+public class TypeConverterUtils {
+
+ private TypeConverterUtils() {
+ throw new UnsupportedOperationException("TypeConverterUtils is a
utility class and cannot be instantiated");
+ }
+
+ public static <T> DataType convert(SeaTunnelDataType<T> seaTunnelDataType)
{
+ if (seaTunnelDataType instanceof BasicType) {
+ convertBasicType((BasicType<T>) seaTunnelDataType);
+ }
+ if (seaTunnelDataType instanceof TimestampType) {
+ return TimestampTypeConverter.INSTANCE.convert((TimestampType)
seaTunnelDataType);
+ }
+ if (seaTunnelDataType instanceof ArrayType) {
+ return convertArrayType((ArrayType<T>) seaTunnelDataType);
+ }
+ if (seaTunnelDataType instanceof PojoType) {
+ return convertPojoType((PojoType<T>) seaTunnelDataType);
+ }
+
+ throw new IllegalArgumentException("Unsupported data type: " +
seaTunnelDataType);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> DataType convertBasicType(BasicType<T> basicType) {
+ Class<T> physicalTypeClass = basicType.getPhysicalTypeClass();
+ if (physicalTypeClass == Boolean.class) {
+ BasicType<Boolean> booleanBasicType = (BasicType<Boolean>)
basicType;
+ return
BasicTypeConverter.BOOLEAN_CONVERTER.convert(booleanBasicType);
+ }
+ if (physicalTypeClass == String.class) {
+ BasicType<String> stringBasicType = (BasicType<String>) basicType;
+ return
BasicTypeConverter.STRING_CONVERTER.convert(stringBasicType);
+ }
+ if (physicalTypeClass == Date.class) {
+ BasicType<Date> dateBasicType = (BasicType<Date>) basicType;
+ return BasicTypeConverter.DATE_CONVERTER.convert(dateBasicType);
+ }
+ if (physicalTypeClass == Double.class) {
+ BasicType<Double> doubleBasicType = (BasicType<Double>) basicType;
+ return
BasicTypeConverter.DOUBLE_CONVERTER.convert(doubleBasicType);
+ }
+ if (physicalTypeClass == Integer.class) {
+ BasicType<Integer> integerBasicType = (BasicType<Integer>)
basicType;
+ return
BasicTypeConverter.INTEGER_CONVERTER.convert(integerBasicType);
+ }
+ if (physicalTypeClass == Long.class) {
+ BasicType<Long> longBasicType = (BasicType<Long>) basicType;
+ return BasicTypeConverter.LONG_CONVERTER.convert(longBasicType);
+ }
+ if (physicalTypeClass == Float.class) {
+ BasicType<Float> floatBasicType = (BasicType<Float>) basicType;
+ return BasicTypeConverter.FLOAT_CONVERTER.convert(floatBasicType);
+ }
+ if (physicalTypeClass == Byte.class) {
+ BasicType<Byte> byteBasicType = (BasicType<Byte>) basicType;
+ return BasicTypeConverter.BYTE_CONVERTER.convert(byteBasicType);
+ }
+ if (physicalTypeClass == Short.class) {
+ BasicType<Short> shortBasicType = (BasicType<Short>) basicType;
+ return BasicTypeConverter.SHORT_CONVERTER.convert(shortBasicType);
+ }
+ if (physicalTypeClass == Character.class) {
+ BasicType<Character> characterBasicType = (BasicType<Character>)
basicType;
+ return
BasicTypeConverter.CHARACTER_CONVERTER.convert(characterBasicType);
+ }
+ if (physicalTypeClass == BigInteger.class) {
+ BasicType<BigInteger> bigIntegerBasicType =
(BasicType<BigInteger>) basicType;
+ return
BasicTypeConverter.BIG_INTEGER_CONVERTER.convert(bigIntegerBasicType);
+ }
+ if (physicalTypeClass == BigDecimal.class) {
+ BasicType<BigDecimal> bigDecimalBasicType =
(BasicType<BigDecimal>) basicType;
+ return
BasicTypeConverter.BID_DECIMAL_CONVERTER.convert(bigDecimalBasicType);
+ }
+ if (physicalTypeClass == Void.class) {
+ BasicType<Void> voidBasicType = (BasicType<Void>) basicType;
+ return BasicTypeConverter.NULL_CONVERTER.convert(voidBasicType);
+ }
+ throw new IllegalArgumentException("Unsupported basic type: " +
basicType);
+ }
+
+ public static <T1> org.apache.spark.sql.types.ArrayType
convertArrayType(ArrayType<T1> arrayType) {
+ ArrayTypeConverter<T1> arrayTypeConverter = new ArrayTypeConverter<>();
+ return arrayTypeConverter.convert(arrayType);
+ }
+
+ public static <T> ObjectType convertPojoType(PojoType<T> pojoType) {
+ PojoTypeConverter<T> pojoTypeConverter = new PojoTypeConverter<>();
+ return pojoTypeConverter.convert(pojoType);
+ }
+}