This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4b3af9bef4 [Improve][Doris Connector] Unified serialization method,Use
RowToJsonConverter and TextSerializationSchema (#7229)
4b3af9bef4 is described below
commit 4b3af9bef4e1753838a7750e86bde71bae8562ae
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Jul 22 13:04:19 2024 +0800
[Improve][Doris Connector] Unified serialization method,Use
RowToJsonConverter and TextSerializationSchema (#7229)
* 1
* 1
* 1
* Update
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
Co-authored-by: Jia Fan <[email protected]>
---------
Co-authored-by: gdliu3 <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
---
.../doris/datatype/DorisTypeConverterFactory.java | 4 +-
.../doris/serialize/SeaTunnelRowConverter.java | 107 -----------------
.../doris/serialize/SeaTunnelRowSerializer.java | 130 +++++++++------------
.../doris/source/serialization/RowBatch.java | 74 ++++++++++--
.../doris/serialize/SeaTunnelRowConverterTest.java | 54 ---------
.../format/json/JsonSerializationSchema.java | 7 ++
.../seatunnel/format/json/RowToJsonConverters.java | 37 +++---
.../format/json/JsonRowDataSerDeSchemaTest.java | 77 ++++++++++++
.../format/text/TextSerializationSchema.java | 16 ++-
.../format/text/CsvTextFormatSchemaTest.java | 40 +++++++
.../format/text/TextFormatSchemaTest.java | 41 +++++++
11 files changed, 322 insertions(+), 265 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
index 4206e4fdc6..04b33f3364 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.connectors.doris.datatype;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
-import org.apache.seatunnel.common.exception.CommonError;
-import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +35,7 @@ public class DorisTypeConverterFactory {
||
dorisVersion.toLowerCase(Locale.ROOT).startsWith("selectdb-doris-2.")) {
return DorisTypeConverterV2.INSTANCE;
} else {
- throw CommonError.unsupportedVersion(DorisConfig.IDENTIFIER,
dorisVersion);
+ return DorisTypeConverterV2.INSTANCE;
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
deleted file mode 100644
index 0fd8e27306..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.serialize;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.DecimalArrayType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.common.utils.DateTimeUtils;
-import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-
-import lombok.Builder;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class SeaTunnelRowConverter {
- @Builder.Default private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
-
- @Builder.Default
- private DateTimeUtils.Formatter dateTimeFormatter =
- DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS;
-
- @Builder.Default private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
-
- protected Object convert(SeaTunnelDataType dataType, Object val) {
- if (val == null) {
- return null;
- }
- switch (dataType.getSqlType()) {
- case TINYINT:
- case SMALLINT:
- case INT:
- case BIGINT:
- case FLOAT:
- case DOUBLE:
- case DECIMAL:
- case BOOLEAN:
- case STRING:
- return val;
- case DATE:
- return DateUtils.toString((LocalDate) val, dateFormatter);
- case TIME:
- return TimeUtils.toString((LocalTime) val, timeFormatter);
- case TIMESTAMP:
- return DateTimeUtils.toString((LocalDateTime) val,
dateTimeFormatter);
- case ARRAY:
- return convertArray(dataType, val);
- case MAP:
- return convertMap(dataType, val);
- case BYTES:
- return new String((byte[]) val);
- default:
- throw new DorisConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- dataType + " is not supported ");
- }
- }
-
- public Object[] convertArray(SeaTunnelDataType dataType, Object val) {
- if (dataType instanceof DecimalArrayType) {
- return (BigDecimal[]) val;
- }
-
- SeaTunnelDataType elementType = ((ArrayType)
dataType).getElementType();
- Object[] realValue = (Object[]) val;
- Object[] newArrayValue = new Object[realValue.length];
- for (int i = 0; i < realValue.length; i++) {
- newArrayValue[i] = convert(elementType, realValue[i]);
- }
- return newArrayValue;
- }
-
- public Map<Object, Object> convertMap(SeaTunnelDataType dataType, Object
val) {
- MapType valueMapType = (MapType) dataType;
- Map<Object, Object> realValue = (Map<Object, Object>) val;
- Map<Object, Object> newMapValue = new LinkedHashMap<>();
- for (Map.Entry entry : realValue.entrySet()) {
- newMapValue.put(
- convert(valueMapType.getKeyType(), entry.getKey()),
- convert(valueMapType.getValueType(), entry.getValue()));
- }
- return newMapValue;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
index 4bfc148d86..0c5b9c0c42 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -17,27 +17,28 @@
package org.apache.seatunnel.connectors.doris.serialize;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.StringJoiner;
+import java.util.Arrays;
+import java.util.List;
-import static com.google.common.base.Preconditions.checkState;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static
org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV;
import static
org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON;
import static
org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE;
-public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements
DorisSerializer {
+public class SeaTunnelRowSerializer implements DorisSerializer {
String type;
- private ObjectMapper objectMapper;
private final SeaTunnelRowType seaTunnelRowType;
private final String fieldDelimiter;
private final boolean enableDelete;
@@ -51,48 +52,29 @@ public class SeaTunnelRowSerializer extends
SeaTunnelRowConverter implements Dor
this.seaTunnelRowType = seaTunnelRowType;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
- if (JSON.equals(type)) {
- objectMapper = new ObjectMapper();
- }
}
- @Override
- public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
- String valString;
- if (JSON.equals(type)) {
- valString = buildJsonString(seaTunnelRow);
- } else if (CSV.equals(type)) {
- valString = buildCSVString(seaTunnelRow);
- } else {
- throw new IllegalArgumentException("The type " + type + " is not
supported!");
- }
- return valString.getBytes(StandardCharsets.UTF_8);
+ public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType
seaTunnelRowType)
+ throws IOException {
+
+ JsonSerializationSchema jsonSerializationSchema =
+ new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
+ ObjectMapper mapper = jsonSerializationSchema.getMapper();
+ mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
true);
+ return jsonSerializationSchema.serialize(row);
}
- public String buildJsonString(SeaTunnelRow row) throws IOException {
- Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
+ public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType
seaTunnelRowType)
+ throws IOException {
- for (int i = 0; i < row.getFields().length; i++) {
- Object value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
- rowMap.put(seaTunnelRowType.getFieldName(i), value);
- }
- if (enableDelete) {
- rowMap.put(LoadConstants.DORIS_DELETE_SIGN,
parseDeleteSign(row.getRowKind()));
- }
- return objectMapper.writeValueAsString(rowMap);
- }
+ TextSerializationSchema build =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(fieldDelimiter)
+ .nullValue(NULL_VALUE)
+ .build();
- public String buildCSVString(SeaTunnelRow row) throws IOException {
- StringJoiner joiner = new StringJoiner(fieldDelimiter);
- for (int i = 0; i < row.getFields().length; i++) {
- Object field = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
- String value = field != null ? field.toString() : NULL_VALUE;
- joiner.add(value);
- }
- if (enableDelete) {
- joiner.add(parseDeleteSign(row.getRowKind()));
- }
- return joiner.toString();
+ return build.serialize(row);
}
public String parseDeleteSign(RowKind rowKind) {
@@ -105,46 +87,40 @@ public class SeaTunnelRowSerializer extends
SeaTunnelRowConverter implements Dor
}
}
- public static Builder builder() {
- return new Builder();
- }
-
- /** Builder for RowDataSerializer. */
- public static class Builder {
- private SeaTunnelRowType seaTunnelRowType;
- private String type;
- private String fieldDelimiter;
- private boolean deletable;
-
- public Builder setType(String type) {
- this.type = type;
- return this;
- }
+ @Override
+ public void open() throws IOException {}
- public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- return this;
- }
+ @Override
+ public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
- public Builder setFieldDelimiter(String fieldDelimiter) {
- this.fieldDelimiter = fieldDelimiter;
- return this;
- }
+ List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
+ List<SeaTunnelDataType<?>> fieldTypes =
Arrays.asList(seaTunnelRowType.getFieldTypes());
- public Builder enableDelete(boolean deletable) {
- this.deletable = deletable;
- return this;
+ if (enableDelete) {
+ SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
+ seaTunnelRowEnableDelete.setField(
+ seaTunnelRow.getFields().length,
parseDeleteSign(seaTunnelRow.getRowKind()));
+ fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
+ fieldTypes.add(STRING_TYPE);
}
- public SeaTunnelRowSerializer build() {
- checkState(CSV.equals(type) && fieldDelimiter != null ||
JSON.equals(type));
- return new SeaTunnelRowSerializer(type, seaTunnelRowType,
fieldDelimiter, deletable);
+ if (JSON.equals(type)) {
+ return buildJsonString(
+ seaTunnelRow,
+ new SeaTunnelRowType(
+ fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+ } else if (CSV.equals(type)) {
+ return buildCSVString(
+ seaTunnelRow,
+ new SeaTunnelRowType(
+ fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+ } else {
+ throw new IllegalArgumentException("The type " + type + " is not
supported!");
}
}
- @Override
- public void open() throws IOException {}
-
@Override
public void close() throws IOException {}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
index a569e2b285..930e83c568 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.doris.source.serialization;
import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator;
import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.DateDayVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
import
org.apache.seatunnel.shade.org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -27,6 +28,7 @@ import
org.apache.seatunnel.shade.org.apache.arrow.vector.Float4Vector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot;
@@ -46,6 +48,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
@@ -71,21 +75,21 @@ import java.util.function.IntFunction;
@Slf4j
public class RowBatch {
+ SeaTunnelDataType<?>[] fieldTypes;
+ private final ArrowStreamReader arrowStreamReader;
+ private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+ private final DateTimeFormatter dateTimeV2Formatter =
+ DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+ private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
// offset for iterate the rowBatch
private int offsetInRowBatch = 0;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
- SeaTunnelDataType<?>[] fieldTypes;
private List<SeaTunnelRow> seatunnelRowBatch = new ArrayList<>();
- private final ArrowStreamReader arrowStreamReader;
private VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
- private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
- private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
- private final DateTimeFormatter dateTimeV2Formatter =
- DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
- private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
public RowBatch(TScanBatchResult nextResult, SeaTunnelRowType
seaTunnelRowType) {
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
@@ -293,6 +297,19 @@ public class RowBatch {
return new BigDecimal(new BigInteger(bytes),
0);
});
break;
+ } else if (fieldVector instanceof VarCharVector) {
+ VarCharVector varCharVector = (VarCharVector) fieldVector;
+ Preconditions.checkArgument(
+ minorType.equals(Types.MinorType.VARCHAR),
+ typeMismatchMessage(currentType, minorType));
+ addValueToRowForAllRows(
+ col,
+ rowIndex ->
+ varCharVector.isNull(rowIndex)
+ ? null
+ : new BigDecimal(
+ new
String(varCharVector.get(rowIndex))));
+ break;
}
DecimalVector decimalVector = (DecimalVector) fieldVector;
Preconditions.checkArgument(
@@ -307,6 +324,21 @@ public class RowBatch {
break;
case "DATE":
case "DATEV2":
+ if (fieldVector instanceof DateDayVector) {
+ DateDayVector dateVector = (DateDayVector) fieldVector;
+ Preconditions.checkArgument(
+ minorType.equals(Types.MinorType.DATEDAY),
+ typeMismatchMessage(currentType, minorType));
+ addValueToRowForAllRows(
+ col,
+ rowIndex -> {
+ if (dateVector.isNull(rowIndex)) {
+ return null;
+ }
+ return
LocalDate.ofEpochDay(dateVector.get(rowIndex));
+ });
+ break;
+ }
VarCharVector dateVector = (VarCharVector) fieldVector;
Preconditions.checkArgument(
minorType.equals(Types.MinorType.VARCHAR),
@@ -322,6 +354,22 @@ public class RowBatch {
});
break;
case "TIMESTAMP":
+ if (fieldVector instanceof TimeStampMicroVector) {
+ TimeStampMicroVector timestampVector =
(TimeStampMicroVector) fieldVector;
+
+ addValueToRowForAllRows(
+ col,
+ rowIndex -> {
+ if (timestampVector.isNull(rowIndex)) {
+ return null;
+ }
+ String stringValue =
timestampVector.getObject(rowIndex).toString();
+ stringValue =
completeMilliseconds(stringValue);
+
+ return DateTimeUtils.parse(stringValue);
+ });
+ break;
+ }
VarCharVector timestampVector = (VarCharVector) fieldVector;
Preconditions.checkArgument(
minorType.equals(Types.MinorType.VARCHAR),
@@ -499,6 +547,9 @@ public class RowBatch {
}
if (vectorObject instanceof Integer) {
+ if (sqlType.equals(SqlType.DATE)) {
+ return LocalDate.ofEpochDay((int) vectorObject);
+ }
return Integer.valueOf(vectorObject.toString());
}
@@ -520,6 +571,8 @@ public class RowBatch {
return LocalDateTime.parse(stringValue, dateTimeV2Formatter);
} else if (sqlType.equals(SqlType.DATE)) {
return LocalDate.parse(vectorObject.toString(), dateFormatter);
+ } else if (sqlType.equals(SqlType.DECIMAL)) {
+ return new BigDecimal(vectorObject.toString());
}
return vectorObject.toString();
}
@@ -540,6 +593,13 @@ public class RowBatch {
}
return new BigDecimal(new BigInteger(bytes), 0);
}
+ if (vectorObject instanceof LocalDate) {
+ return DateUtils.parse(vectorObject.toString());
+ }
+
+ if (vectorObject instanceof LocalDateTime) {
+ return DateTimeUtils.parse(vectorObject.toString());
+ }
return vectorObject.toString();
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java
deleted file mode 100644
index 5755beb3f7..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.serialize;
-
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDateTime;
-
-public class SeaTunnelRowConverterTest {
-
- private static final SeaTunnelRowConverter seaTunnelRowConverter = new
SeaTunnelRowConverter();
-
- @Test
- void testDateTimeWithNano() {
- Assertions.assertEquals(
- "2021-01-01 00:00:00.123456",
- seaTunnelRowConverter.convert(
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- LocalDateTime.of(2021, 1, 1, 0, 0, 0, 123456789)));
- Assertions.assertEquals(
- "2021-01-01 00:00:00.000000",
- seaTunnelRowConverter.convert(
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- LocalDateTime.of(2021, 1, 1, 0, 0, 0, 0)));
- Assertions.assertEquals(
- "2021-01-01 00:00:00.000001",
- seaTunnelRowConverter.convert(
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- LocalDateTime.of(2021, 1, 1, 0, 0, 0, 1000)));
- Assertions.assertEquals(
- "2021-01-01 00:00:00.000123",
- seaTunnelRowConverter.convert(
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- LocalDateTime.of(2021, 1, 1, 0, 0, 0, 123456)));
- }
-}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 4e2e98317b..b35710b3a0 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -59,6 +59,13 @@ public class JsonSerializationSchema implements
SerializationSchema {
this.charset = charset;
}
+ public JsonSerializationSchema(SeaTunnelRowType rowType, String nullValue)
{
+ this.rowType = rowType;
+ this.runtimeConverter =
+ new
RowToJsonConverters().createConverter(checkNotNull(rowType), nullValue);
+ this.charset = StandardCharsets.UTF_8;
+ }
+
@Override
public byte[] serialize(SeaTunnelRow row) {
if (node == null) {
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 575b5bace1..2cf8ae092e 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -49,15 +49,25 @@ public class RowToJsonConverters implements Serializable {
private static final long serialVersionUID = 6988876688930916940L;
+ private String nullValue;
+
public RowToJsonConverter createConverter(SeaTunnelDataType<?> type) {
return wrapIntoNullableConverter(createNotNullConverter(type));
}
+ public RowToJsonConverter createConverter(SeaTunnelDataType<?> type,
String nullValue) {
+ this.nullValue = nullValue;
+ return createConverter(type);
+ }
+
private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter
converter) {
return new RowToJsonConverter() {
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse,
Object value) {
if (value == null) {
+ if (nullValue != null) {
+ return mapper.getNodeFactory().textNode(nullValue);
+ }
return mapper.getNodeFactory().nullNode();
}
return converter.convert(mapper, reuse, value);
@@ -74,7 +84,9 @@ public class RowToJsonConverters implements Serializable {
return new RowToJsonConverter() {
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode
reuse, Object value) {
- return null;
+ return nullValue == null
+ ? null
+ : mapper.getNodeFactory().textNode((String)
value);
}
};
case BOOLEAN:
@@ -175,8 +187,7 @@ public class RowToJsonConverters implements Serializable {
return createArrayConverter((ArrayType) type);
case MAP:
MapType mapType = (MapType) type;
- return createMapConverter(
- mapType.toString(), mapType.getKeyType(),
mapType.getValueType());
+ return createMapConverter(mapType.getKeyType(),
mapType.getValueType());
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
@@ -258,15 +269,10 @@ public class RowToJsonConverters implements Serializable {
}
private RowToJsonConverter createMapConverter(
- String typeSummary, SeaTunnelDataType<?> keyType,
SeaTunnelDataType<?> valueType) {
- if (!SqlType.STRING.equals(keyType.getSqlType())) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "JSON format doesn't support non-string as key type of
map. The type is: "
- + typeSummary);
- }
-
+ SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
+ final RowToJsonConverter keyConverter = createConverter(keyType);
final RowToJsonConverter valueConverter = createConverter(valueType);
+
return new RowToJsonConverter() {
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse,
Object value) {
@@ -280,9 +286,12 @@ public class RowToJsonConverters implements Serializable {
node.removeAll();
}
- Map<String, ?> mapData = (Map) value;
- for (Map.Entry<String, ?> entry : mapData.entrySet()) {
- String fieldName = entry.getKey();
+ Map<?, ?> mapData = (Map) value;
+ for (Map.Entry<?, ?> entry : mapData.entrySet()) {
+ // Convert the key to a string using the key converter
+ JsonNode keyNode = keyConverter.convert(mapper, null,
entry.getKey());
+ String fieldName = keyNode.isTextual() ? keyNode.asText()
: keyNode.toString();
+
node.set(
fieldName,
valueConverter.convert(mapper,
node.get(fieldName), entry.getValue()));
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index ff1bb82005..fb6fd9da76 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -601,4 +601,81 @@ public class JsonRowDataSerDeSchemaTest {
"ErrorCode:[COMMON-33], ErrorDescription:[The datetime format
'2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check
the datetime format.]",
exception2.getCause().getCause().getMessage());
}
+
+ @Test
+ public void testSerializationWithNullValue() {
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(
+ new String[] {
+ "bool", "int", "longValue", "float", "name",
"date", "time", "timestamp"
+ },
+ new SeaTunnelDataType[] {
+ BOOLEAN_TYPE,
+ INT_TYPE,
+ LONG_TYPE,
+ FLOAT_TYPE,
+ STRING_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+
+ Object[] fields = new Object[] {null, null, null, null, null, null,
null, null};
+ SeaTunnelRow expected = new SeaTunnelRow(fields);
+ assertEquals(
+
"{\"bool\":\"\\\\N\",\"int\":\"\\\\N\",\"longValue\":\"\\\\N\",\"float\":\"\\\\N\",\"name\":\"\\\\N\",\"date\":\"\\\\N\",\"time\":\"\\\\N\",\"timestamp\":\"\\\\N\"}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(expected)));
+ }
+
+ @Test
+ public void testSerializationWithMapHasNonStringKey() {
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(
+ new String[] {"mapii", "mapbb"},
+ new SeaTunnelDataType[] {
+ new MapType(INT_TYPE, INT_TYPE), new
MapType(BOOLEAN_TYPE, INT_TYPE)
+ });
+ Map<Integer, Integer> mapII = new HashMap<>();
+ mapII.put(1, 2);
+
+ Map<Boolean, Integer> mapBI = new HashMap<>();
+ mapBI.put(true, 3);
+
+ Object[] fields = new Object[] {mapII, mapBI};
+ SeaTunnelRow expected = new SeaTunnelRow(fields);
+ assertEquals(
+ "{\"mapii\":{\"1\":2},\"mapbb\":{\"true\":3}}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(expected)));
+ }
+
+ @Test
+ public void testSerializationWithTimestamp() {
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(
+ new String[] {"timestamp"},
+ new SeaTunnelDataType[]
{LocalTimeType.LOCAL_DATE_TIME_TYPE});
+ LocalDateTime timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0,
123456000);
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "{\"timestamp\":\"2022-09-24T22:45:00.123456\"}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(row)));
+
+ timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 0);
+ row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "{\"timestamp\":\"2022-09-24T22:45:00\"}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(row)));
+
+ timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 1000);
+ row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "{\"timestamp\":\"2022-09-24T22:45:00.000001\"}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(row)));
+
+ timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456);
+ row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "{\"timestamp\":\"2022-09-24T22:45:00.000123456\"}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(row)));
+ }
}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 6f108ee295..01ca981a11 100644
---
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -48,6 +48,7 @@ public class TextSerializationSchema implements
SerializationSchema {
private final DateTimeUtils.Formatter dateTimeFormatter;
private final TimeUtils.Formatter timeFormatter;
private final Charset charset;
+ private final String nullValue;
private TextSerializationSchema(
@NonNull SeaTunnelRowType seaTunnelRowType,
@@ -55,13 +56,15 @@ public class TextSerializationSchema implements
SerializationSchema {
DateUtils.Formatter dateFormatter,
DateTimeUtils.Formatter dateTimeFormatter,
TimeUtils.Formatter timeFormatter,
- Charset charset) {
+ Charset charset,
+ String nullValue) {
this.seaTunnelRowType = seaTunnelRowType;
this.separators = separators;
this.dateFormatter = dateFormatter;
this.dateTimeFormatter = dateTimeFormatter;
this.timeFormatter = timeFormatter;
this.charset = charset;
+ this.nullValue = nullValue;
}
public static Builder builder() {
@@ -76,6 +79,7 @@ public class TextSerializationSchema implements
SerializationSchema {
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
private Charset charset = StandardCharsets.UTF_8;
+ private String nullValue = "";
private Builder() {}
@@ -114,6 +118,11 @@ public class TextSerializationSchema implements
SerializationSchema {
return this;
}
+ public Builder nullValue(String nullValue) {
+ this.nullValue = nullValue;
+ return this;
+ }
+
public TextSerializationSchema build() {
return new TextSerializationSchema(
seaTunnelRowType,
@@ -121,7 +130,8 @@ public class TextSerializationSchema implements
SerializationSchema {
dateFormatter,
dateTimeFormatter,
timeFormatter,
- charset);
+ charset,
+ nullValue);
}
}
@@ -141,7 +151,7 @@ public class TextSerializationSchema implements
SerializationSchema {
private String convert(Object field, SeaTunnelDataType<?> fieldType, int
level) {
if (field == null) {
- return "";
+ return nullValue;
}
switch (fieldType.getSqlType()) {
case DOUBLE:
diff --git
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
index 0f58e32f14..77c80a4bb8 100644
---
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter;
import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
import org.junit.jupiter.api.Assertions;
@@ -34,9 +35,12 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class CsvTextFormatSchemaTest {
public String content =
"\"mess,age\","
@@ -150,4 +154,40 @@ public class CsvTextFormatSchemaTest {
Assertions.assertEquals(((Map<?, ?>)
(seaTunnelRow.getField(15))).get("tyrantlucifer"), 18);
Assertions.assertEquals(((Map<?, ?>)
(seaTunnelRow.getField(15))).get("Kris"), 21);
}
+
+ @Test
+ public void testSerializationWithTimestamp() {
+ String delimiter = ",";
+
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(
+ new String[] {"timestamp"},
+ new SeaTunnelDataType[]
{LocalTimeType.LOCAL_DATE_TIME_TYPE});
+ LocalDateTime timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0,
123456000);
+ TextSerializationSchema textSerializationSchema =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(schema)
+
.dateTimeFormatter(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS)
+ .delimiter(delimiter)
+ .build();
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {timestamp});
+
+ assertEquals(
+ "2022-09-24 22:45:00.123456", new
String(textSerializationSchema.serialize(row)));
+
+ timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 0);
+ row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "2022-09-24 22:45:00.000000", new
String(textSerializationSchema.serialize(row)));
+
+ timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 1000);
+ row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "2022-09-24 22:45:00.000001", new
String(textSerializationSchema.serialize(row)));
+
+ timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456);
+ row = new SeaTunnelRow(new Object[] {timestamp});
+ assertEquals(
+ "2022-09-24 22:45:00.000123", new
String(textSerializationSchema.serialize(row)));
+ }
}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
index 45574392d2..a8ab6decfa 100644
---
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
@@ -36,6 +36,13 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class TextFormatSchemaTest {
public String content =
String.join("\u0002", Arrays.asList("1", "2", "3", "4", "5", "6"))
@@ -187,4 +194,38 @@ public class TextFormatSchemaTest {
"ErrorCode:[COMMON-33], ErrorDescription:[The datetime format
'2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check
the datetime format.]",
exception2.getMessage());
}
+
+ @Test
+ public void testSerializationWithNullValue() throws Exception {
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(
+ new String[] {
+ "bool", "int", "longValue", "float", "name",
"date", "time", "timestamp"
+ },
+ new SeaTunnelDataType[] {
+ BOOLEAN_TYPE,
+ INT_TYPE,
+ LONG_TYPE,
+ FLOAT_TYPE,
+ STRING_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+
+ Object[] fields = new Object[] {null, null, null, null, null, null,
null, null};
+ SeaTunnelRow expected = new SeaTunnelRow(fields);
+
+ TextSerializationSchema textSerializationSchema =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(schema)
+ .delimiter("\u0001")
+ .nullValue("\\N")
+ .build();
+
+ System.out.println(new
String(textSerializationSchema.serialize(expected)));
+ assertEquals(
+
"\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N",
+ new String(textSerializationSchema.serialize(expected)));
+ }
}