ashulin commented on code in PR #2750:
URL:
https://github.com/apache/incubator-seatunnel/pull/2750#discussion_r973567728
##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -213,4 +225,60 @@ private static SeaTunnelRow reconvert(InternalRow
engineRow, SeaTunnelRowType ro
}
return new SeaTunnelRow(fields);
}
+
+ private static DataType seaTunnelType2SparkType(SeaTunnelDataType<?>
seaTunnelDataType) {
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ switch (sqlType) {
+ case ARRAY:
+ BasicType<?> elementType = ((ArrayType<?, ?>)
seaTunnelDataType).getElementType();
+ return
DataTypes.createArrayType(seaTunnelType2SparkType(elementType));
+ case MAP:
+ SeaTunnelDataType<?> keyType = ((MapType<?, ?>)
seaTunnelDataType).getKeyType();
+ SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
seaTunnelDataType).getValueType();
+ return
DataTypes.createMapType(seaTunnelType2SparkType(keyType),
seaTunnelType2SparkType(valueType));
+ case STRING:
+ return DataTypes.StringType;
+ case BOOLEAN:
+ return DataTypes.BooleanType;
+ case TINYINT:
+ return DataTypes.ByteType;
+ case SMALLINT:
+ return DataTypes.ShortType;
+ case INT:
+ return DataTypes.IntegerType;
+ case BIGINT:
+ return DataTypes.LongType;
+ case FLOAT:
+ return DataTypes.FloatType;
+ case DOUBLE:
+ return DataTypes.DoubleType;
+ case DECIMAL:
+ int precision = ((DecimalType)
seaTunnelDataType).getPrecision();
+ int scale = ((DecimalType) seaTunnelDataType).getScale();
+ return DataTypes.createDecimalType(precision, scale);
+ case NULL:
+ return DataTypes.NullType;
+ case BYTES:
+ return DataTypes.BinaryType;
+ case DATE:
+ return DataTypes.DateType;
+ case TIMESTAMP:
+ return DataTypes.TimestampType;
+ case TIME:
+ throw new RuntimeException("SeaTunnel not support time type,
it will be supported in the future");
+ case ROW:
+ ArrayList<StructField> structFields = new ArrayList<>();
+ SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType)
seaTunnelDataType).getFieldTypes();
+ String[] fieldNames = ((SeaTunnelRowType)
seaTunnelDataType).getFieldNames();
+ for (int i = 0; i < fieldNames.length; i++) {
+ StructField structField = new StructField(fieldNames[i],
seaTunnelType2SparkType(fieldTypes[i]), true, null);
+ structFields.add(structField);
+ }
+ return DataTypes.createStructType(structFields);
+ default:
+ // do nothing
+ // never get in there
+ return null;
+ }
+ }
Review Comment:
Please use `TypeConverterUtils#convert(SeaTunnelDataType<?>)`
##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -111,23 +128,35 @@ private static InternalRow convert(SeaTunnelRow
seaTunnelRow, SeaTunnelRowType r
private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType,
BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
if (mapData == null || mapData.size() == 0) {
- return mapData;
+ return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
}
- switch (mapType.getValueType().getSqlType()) {
- case MAP:
- case ROW:
- case DATE:
- case TIME:
- case TIMESTAMP:
- Map<Object, Object> newMap = new HashMap<>(mapData.size());
- mapData.forEach((key, value) -> {
- SeaTunnelDataType<?> valueType = mapType.getValueType();
- newMap.put(key, convertFunction.apply(value, valueType));
- });
- return newMap;
- default:
- return mapData;
+ Map<Object, Object> newMap = new HashMap<>(mapData.size());
+ mapData.forEach((key, value) -> {
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ newMap.put(convertFunction.apply(key, keyType),
convertFunction.apply(value, valueType));
+ });
+ Object[] keys = newMap.keySet().toArray();
+ Object[] values = newMap.values().toArray();
+ return ArrayBasedMapData.apply(keys, values);
+ }
+
+ private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType,
BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
+ if (mapData == null || mapData.numElements() == 0) {
+ return Collections.emptyMap();
+ }
+ Map<Object, Object> newMap = new HashMap<>(mapData.numElements());
+ int num = mapData.numElements();
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ Object[] keys =
mapData.keyArray().toObjectArray(seaTunnelType2SparkType(keyType));
+ Object[] values =
mapData.valueArray().toObjectArray(seaTunnelType2SparkType(valueType));
+ for (int i = 0; i < num; i++) {
+ keys[i] = convertFunction.apply(keys[i], keyType);
+ values[i] = convertFunction.apply(values[i], valueType);
+ newMap.put(keys[i], values[i]);
}
Review Comment:
Can't it be like this here?
```java
Map<Object, Object> newMap = new HashMap<>(mapData.size());
mapData.forEach((key, value) -> {
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
newMap.put(convertFunction.apply(key, keyType),
convertFunction.apply(value, valueType));
});
```
##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -111,23 +128,35 @@ private static InternalRow convert(SeaTunnelRow
seaTunnelRow, SeaTunnelRowType r
private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType,
BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
if (mapData == null || mapData.size() == 0) {
- return mapData;
+ return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
}
- switch (mapType.getValueType().getSqlType()) {
- case MAP:
- case ROW:
- case DATE:
- case TIME:
- case TIMESTAMP:
- Map<Object, Object> newMap = new HashMap<>(mapData.size());
- mapData.forEach((key, value) -> {
- SeaTunnelDataType<?> valueType = mapType.getValueType();
- newMap.put(key, convertFunction.apply(value, valueType));
- });
- return newMap;
- default:
- return mapData;
+ Map<Object, Object> newMap = new HashMap<>(mapData.size());
+ mapData.forEach((key, value) -> {
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ newMap.put(convertFunction.apply(key, keyType),
convertFunction.apply(value, valueType));
+ });
+ Object[] keys = newMap.keySet().toArray();
+ Object[] values = newMap.values().toArray();
+ return ArrayBasedMapData.apply(keys, values);
+ }
+
+ private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType,
BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
+ if (mapData == null || mapData.numElements() == 0) {
+ return Collections.emptyMap();
+ }
+ Map<Object, Object> newMap = new HashMap<>(mapData.numElements());
+ int num = mapData.numElements();
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ Object[] keys =
mapData.keyArray().toObjectArray(seaTunnelType2SparkType(keyType));
+ Object[] values =
mapData.valueArray().toObjectArray(seaTunnelType2SparkType(valueType));
+ for (int i = 0; i < num; i++) {
+ keys[i] = convertFunction.apply(keys[i], keyType);
+ values[i] = convertFunction.apply(values[i], valueType);
+ newMap.put(keys[i], values[i]);
}
Review Comment:
If it can't, remove the `convertFunction` parameter in `#convertMap` and
`#reconvertMap`, as it was meant to support both convert and reconvert, which
is useless now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]