This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dd0573e2f4 [INLONG-9562][Sort] Support rowdata way of sort message CSV
format (#9621)
dd0573e2f4 is described below
commit dd0573e2f4b4e029d76d8d2d9b11004247837353
Author: baomingyu <[email protected]>
AuthorDate: Sun Jan 28 23:55:56 2024 +0800
[INLONG-9562][Sort] Support rowdata way of sort message CSV format (#9621)
---
.../format-rowdata/format-rowdata-csv/pom.xml | 147 +++++++++
.../inlong/sort/formats/csv/CsvFormatFactory.java | 235 ++++++++++++++
.../csv/CsvRowDataDeserializationSchema.java | 288 +++++++++++++++++
.../formats/csv/CsvRowDataSerializationSchema.java | 265 ++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../sort/formats/csv/CsvFormatFactoryTest.java | 147 +++++++++
.../csv/CsvRowDataDeserializationSchemaTest.java | 342 ++++++++++++++++++++
.../csv/CsvRowDataSerializationSchemaTest.java | 350 +++++++++++++++++++++
.../src/test/resources/log4j-test.properties | 27 ++
9 files changed, 1817 insertions(+)
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
new file mode 100644
index 0000000000..0789fb6a0e
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
@@ -0,0 +1,147 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>format-rowdata</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-format-rowdata-csv</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort Format-RowData-CSV</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-rowdata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ <version>${flink.jackson.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- test dependencies -->
+
+ <!-- CSV table descriptor testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- CSV RowData (de)serialization schema testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- CSV rowData encoder/intputformat testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>japicmp-report</id>
+ <activation>
+ <property>
+ <name>japicmp-report</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.siom79.japicmp</groupId>
+ <artifactId>japicmp-maven-plugin</artifactId>
+ <configuration>
+ <parameter>
+
<breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications>
+
<breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications>
+ </parameter>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>japicmp-check</id>
+ <activation>
+ <property>
+ <name>!japicmp-report</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.siom79.japicmp</groupId>
+ <artifactId>japicmp-maven-plugin</artifactId>
+ <configuration>
+ <parameter>
+ <excludes>
+
<exclude>org.apache.inlong.sort.flink.formats.csv.CsvRowDataDeserializationSchema</exclude>
+
<exclude>org.apache.inlong.sort.flink.formats.csv.CsvRowDataSerializationSchema</exclude>
+ </excludes>
+ </parameter>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvFormatFactory.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvFormatFactory.java
new file mode 100644
index 0000000000..f164c07eb8
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvFormatFactory.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.csv;
+
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatOptions.IGNORE_ERRORS;
+import static
org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
+import static org.apache.inlong.sort.formats.base.TextFormatOptions.CHARSET;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.FIELD_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.QUOTE_CHARACTER;
+
+/**
+ * Table format factory for providing configured instances of CSV-to-row
+ * serializer and deserializer.
+ */
+public class CsvFormatFactory implements DeserializationFormatFactory,
SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "inlong-csv";
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>>
createDecodingFormat(DynamicTableFactory.Context context,
+ ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+
+ return new DecodingFormat<DeserializationSchema<RowData>>() {
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType
producedDataType) {
+ final TypeInformation<RowData> rowDataTypeInfo =
+ context.createTypeInformation(producedDataType);
+ final CsvRowDataDeserializationSchema.Builder schemaBuilder =
+ new CsvRowDataDeserializationSchema.Builder(
+
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)),
+ rowDataTypeInfo);
+ configureDeserializationSchema(formatOptions, schemaBuilder);
+ return schemaBuilder.build();
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat<SerializationSchema<RowData>>
createEncodingFormat(DynamicTableFactory.Context context,
+ ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+
+ return new EncodingFormat<SerializationSchema<RowData>>() {
+
+ @Override
+ public SerializationSchema<RowData> createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType
consumedDataType) {
+ final RowFormatInfo projectedRowFormatInfo =
TableFormatForRowDataUtils.projectRowFormatInfo(
+
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)),
+ consumedDataType);
+ final CsvRowDataSerializationSchema.Builder schemaBuilder =
+ new
CsvRowDataSerializationSchema.Builder(projectedRowFormatInfo);
+ configureSerializationSchema(formatOptions, schemaBuilder);
+ return schemaBuilder.build();
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Stream.of(ROW_FORMAT_INFO).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(CHARSET);
+ options.add(FIELD_DELIMITER);
+ options.add(QUOTE_CHARACTER);
+ options.add(ESCAPE_CHARACTER);
+ options.add(NULL_LITERAL);
+ options.add(IGNORE_ERRORS);
+ return options;
+ }
+
+ // ------------------------------------------------------------------------
+ // Validation
+ // ------------------------------------------------------------------------
+
+ static void validateFormatOptions(ReadableConfig tableOptions) {
+ // Validate the option value must be a single char.
+ validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
+ validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+ validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option) {
+ validateCharacterVal(tableOptions, option, false);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ *
+ * @param tableOptions the table options
+ * @param option the config option
+ * @param unescape whether to unescape the option value
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option, boolean
unescape) {
+ if (tableOptions.getOptional(option).isPresent()) {
+ final String value =
+ unescape
+ ?
StringEscapeUtils.unescapeJava(tableOptions.get(option))
+ : tableOptions.get(option);
+ if (value.length() != 1) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s.%s' must be a string with single
character, but was: %s",
+ IDENTIFIER, option.key(),
tableOptions.get(option)));
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static void configureDeserializationSchema(
+ ReadableConfig formatOptions,
CsvRowDataDeserializationSchema.Builder schemaBuilder) {
+
schemaBuilder.setCharset(formatOptions.getOptional(CHARSET).orElse(CHARSET.defaultValue()));
+
+ formatOptions
+ .getOptional(FIELD_DELIMITER)
+ .map(delimiter ->
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setFieldDelimiter);
+
+ formatOptions
+ .getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(schemaBuilder::setQuoteCharacter);
+
+ formatOptions
+ .getOptional(ESCAPE_CHARACTER)
+ .map(escape -> escape.charAt(0))
+ .ifPresent(schemaBuilder::setEscapeCharacter);
+
+
formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);
+
+
formatOptions.getOptional(IGNORE_ERRORS).ifPresent(schemaBuilder::setIgnoreErrors);
+ }
+
+ private static void configureSerializationSchema(
+ ReadableConfig formatOptions,
+ CsvRowDataSerializationSchema.Builder schemaBuilder) {
+
schemaBuilder.setCharset(formatOptions.getOptional(CHARSET).orElse(CHARSET.defaultValue()));
+
+ formatOptions
+ .getOptional(FIELD_DELIMITER)
+ .map(delimiter ->
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setFieldDelimiter);
+
+ formatOptions
+ .getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(schemaBuilder::setQuoteCharacter);
+
+ formatOptions
+ .getOptional(ESCAPE_CHARACTER)
+ .map(escape -> escape.charAt(0))
+ .ifPresent(schemaBuilder::setEscapeCharacter);
+
+
formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);
+
+
formatOptions.getOptional(IGNORE_ERRORS).ifPresent(schemaBuilder::setIgnoreErrors);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
new file mode 100644
index 0000000000..ee36e98126
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.csv;
+
+import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
+
+/**
+ * Deserialization schema from CSV to Flink Table & SQL internal data
structures.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and
converts it to {@link
+ * RowData}.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped {@link
IOException}s.
+ */
+@Internal
+public final class CsvRowDataDeserializationSchema extends
DefaultDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CsvRowDataDeserializationSchema.class);
+
+ /**
+ * Type information describing the result type.
+ */
+ @Nonnull
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ /**
+ * Format information describing the result type.
+ */
+ @Nonnull
+ private final RowFormatInfo rowFormatInfo;
+
+ /**
+ * The charset of the text.
+ */
+ @Nonnull
+ private final String charset;
+
+ /**
+ * The delimiter between fields.
+ */
+ @Nonnull
+ private final Character delimiter;
+
+ /**
+ * Escape character. Null if escaping is disabled.
+ */
+ @Nullable
+ private final Character escapeChar;
+
+ /**
+ * Quote character. Null if quoting is disabled.
+ */
+ @Nullable
+ private final Character quoteChar;
+
+ /**
+ * The literal represented null values, default "".
+ */
+ @Nullable
+ private final String nullLiteral;
+
+ private final FieldToRowDataConverter[] converters;
+
+ public CsvRowDataDeserializationSchema(
+ @Nonnull TypeInformation<RowData> resultTypeInfo,
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ Boolean ignoreErrors) {
+ super(ignoreErrors);
+ this.resultTypeInfo = resultTypeInfo;
+ this.rowFormatInfo = rowFormatInfo;
+ this.charset = charset;
+ this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+
+ converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+ .map(formatInfo -> FieldToRowDataConverters.createConverter(
+ TableFormatUtils.deriveLogicalType(formatInfo)))
+ .toArray(FieldToRowDataConverter[]::new);
+ }
+
+ /**
+ * A builder for creating a {@link CsvRowDataDeserializationSchema}.
+ */
+ @Internal
+ public static class Builder {
+
+ private final RowFormatInfo rowFormatInfo;
+
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ private String charset = DEFAULT_CHARSET;
+
+ protected char delimiter = DEFAULT_DELIMITER;
+
+ private Character escapeChar = DEFAULT_ESCAPE_CHARACTER;
+
+ private Character quoteChar = DEFAULT_QUOTE_CHARACTER;
+
+ private String nullLiteral = DEFAULT_NULL_LITERAL;
+
+ private Boolean ignoreErrors = DEFAULT_IGNORE_ERRORS;
+
+ /**
+ * Creates a CSV deserialization schema for the given {@link
TypeInformation} with optional
+ * parameters.
+ */
+ public Builder(RowFormatInfo rowFormatInfo, TypeInformation<RowData>
resultTypeInfo) {
+ Preconditions.checkNotNull(rowFormatInfo, "RowFormatInfo must not
be null.");
+ Preconditions.checkNotNull(resultTypeInfo, "Result type
information must not be null.");
+ this.rowFormatInfo = rowFormatInfo;
+ this.resultTypeInfo = resultTypeInfo;
+ }
+
+ public Builder setCharset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(char delimiter) {
+ this.delimiter = delimiter;
+ return this;
+ }
+
+ public Builder setQuoteCharacter(char c) {
+ this.quoteChar = c;
+ return this;
+ }
+
+ public Builder setEscapeCharacter(char c) {
+ this.escapeChar = c;
+ return this;
+ }
+
+ public Builder setNullLiteral(String nullLiteral) {
+ Preconditions.checkNotNull(nullLiteral, "Null literal must not be
null.");
+ this.nullLiteral = nullLiteral;
+ return this;
+ }
+
+ public Builder setIgnoreErrors(Boolean ignoreErrors) {
+ this.ignoreErrors = ignoreErrors;
+ return this;
+ }
+
+ public CsvRowDataDeserializationSchema build() {
+ return new CsvRowDataDeserializationSchema(
+ resultTypeInfo,
+ rowFormatInfo,
+ charset,
+ delimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ ignoreErrors);
+ }
+ }
+
+ @Override
+ public RowData deserializeInternal(@Nullable byte[] message) {
+ if (message == null) {
+ return null;
+ }
+ String text = new String(message, Charset.forName(charset));
+
+ try {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
+
+ String[] fieldTexts = splitCsv(text, delimiter, escapeChar,
quoteChar);
+ if (fieldTexts.length != fieldNames.length) {
+ LOG.warn("The number of fields mismatches: expected=[{}],
actual=[{}]. Text=[{}].",
+ fieldNames.length, fieldTexts.length, text);
+ }
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+ for (int i = 0; i < fieldNames.length; ++i) {
+ if (i >= fieldTexts.length) {
+ rowData.setField(i, null);
+ } else {
+ Object field =
+ TableFormatUtils.deserializeBasicField(
+ fieldNames[i],
+ fieldFormatInfos[i],
+ fieldTexts[i],
+ nullLiteral);
+
+ rowData.setField(i, converters[i].convert(field));
+ }
+ }
+
+ return rowData;
+ } catch (Throwable t) {
+ throw new RuntimeException(
+ String.format("Could not properly deserialize csv.
Text=[%s].", text), t);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return resultTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ CsvRowDataDeserializationSchema that =
(CsvRowDataDeserializationSchema) o;
+ return resultTypeInfo.equals(that.resultTypeInfo) &&
+ rowFormatInfo.equals(that.rowFormatInfo) &&
+ charset.equals(that.charset) &&
+ delimiter.equals(that.delimiter) &&
+ Objects.equals(escapeChar, that.escapeChar) &&
+ Objects.equals(quoteChar, that.quoteChar) &&
+ Objects.equals(nullLiteral, that.nullLiteral);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), resultTypeInfo, rowFormatInfo,
charset, delimiter, escapeChar, quoteChar,
+ nullLiteral);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataSerializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataSerializationSchema.java
new file mode 100644
index 0000000000..9842e3cc98
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataSerializationSchema.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.csv;
+
+import org.apache.inlong.sort.formats.base.DefaultSerializationSchema;
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.flink.annotation.PublicEvolving;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.Objects;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static org.apache.inlong.sort.formats.util.StringUtils.concatCsv;
+
+/**
+ * Serialization schema that serializes an object of Flink Table & SQL
internal data structure into
+ * a CSV bytes.
+ *
+ * <p>Serializes the input row into a {@link JsonNode} and converts it into
<code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * CsvRowDataDeserializationSchema}.
+ */
+@PublicEvolving
+public final class CsvRowDataSerializationSchema extends
DefaultSerializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CsvRowDataDeserializationSchema.class);
+
+ /**
+ * Format information describing the result type.
+ */
+ @Nonnull
+ private final RowFormatInfo rowFormatInfo;
+
+ /**
+ * The charset of the text.
+ */
+ @Nonnull
+ private final String charset;
+
+ /**
+ * The delimiter between fields.
+ */
+ @Nonnull
+ private final Character delimiter;
+
+ /**
+ * Escape character. Null if escaping is disabled.
+ */
+ @Nullable
+ private final Character escapeChar;
+
+ /**
+ * Quote character. Null if quoting is disabled.
+ */
+ @Nullable
+ private final Character quoteChar;
+
+ /**
+ * The literal represented null values, default "".
+ */
+ @Nullable
+ private final String nullLiteral;
+
+ private final RowDataToFieldConverters.RowFieldConverter[]
rowFieldConverters;
+
+ public CsvRowDataSerializationSchema(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ Boolean ignoreErrors) {
+ super(ignoreErrors);
+ this.rowFormatInfo = rowFormatInfo;
+ this.charset = charset;
+ this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+ rowFieldConverters = new
RowDataToFieldConverters.RowFieldConverter[fieldFormatInfos.length];
+ for (int i = 0; i < rowFieldConverters.length; i++) {
+ rowFieldConverters[i] =
RowDataToFieldConverters.createNullableRowFieldConverter(
+
TableFormatForRowDataUtils.deriveLogicalType(fieldFormatInfos[i]));
+ }
+ }
+
+ /**
+ * A builder for creating a {@link CsvRowDataSerializationSchema}.
+ */
+ @PublicEvolving
+ public static class Builder {
+
+ private final RowFormatInfo rowFormatInfo;
+
+ private String charset = DEFAULT_CHARSET;
+
+ protected char delimiter = DEFAULT_DELIMITER;
+
+ private Character escapeChar = DEFAULT_ESCAPE_CHARACTER;
+
+ private Character quoteChar = DEFAULT_QUOTE_CHARACTER;
+
+ private String nullLiteral = DEFAULT_NULL_LITERAL;
+
+ private Boolean ignoreErrors = DEFAULT_IGNORE_ERRORS;
+
+ /**
+ * Creates a {@link CsvRowDataSerializationSchema} expecting the given
{@link RowType}.
+ *
+ * @param rowFormatInfo logical row format info used to create schema.
+ */
+ public Builder(RowFormatInfo rowFormatInfo) {
+ Preconditions.checkNotNull(rowFormatInfo, "Row format info must
not be null.");
+ this.rowFormatInfo = rowFormatInfo;
+ }
+
+ public Builder setCharset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(char delimiter) {
+ this.delimiter = delimiter;
+ return this;
+ }
+
+ public Builder setQuoteCharacter(char c) {
+ this.quoteChar = c;
+ return this;
+ }
+
+ public Builder setEscapeCharacter(char c) {
+ this.escapeChar = c;
+ return this;
+ }
+
+ public Builder setNullLiteral(String nullLiteral) {
+ Preconditions.checkNotNull(nullLiteral, "Null literal must not be
null.");
+ this.nullLiteral = nullLiteral;
+ return this;
+ }
+
+ public Builder setIgnoreErrors(boolean ingoreErrors) {
+ this.ignoreErrors = ingoreErrors;
+ return this;
+ }
+
+ public CsvRowDataSerializationSchema build() {
+ return new CsvRowDataSerializationSchema(
+ rowFormatInfo,
+ charset,
+ delimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ ignoreErrors);
+ }
+ }
+
+ @Override
+ public byte[] serializeInternal(RowData rowData) {
+ if (rowData == null) {
+ return null;
+ }
+ try {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
+
+ if (rowData.getArity() != fieldFormatInfos.length) {
+ LOG.warn("The number of fields mismatches: expected=[{}],
actual=[{}]. Row=[{}].",
+ fieldNames.length, rowData.getArity(), rowData);
+ }
+
+ String[] fieldTexts = new String[fieldNames.length];
+
+ // The extra fields will be dropped.
+ for (int i = 0; i < fieldNames.length; ++i) {
+ if (i >= rowData.getArity()) {
+ // The absent fields will be filled with null literal
+ fieldTexts[i] = nullLiteral == null ? "" : nullLiteral;
+ } else {
+ String fieldText =
+ TableFormatForRowDataUtils.serializeBasicField(
+ fieldNames[i],
+ fieldFormatInfos[i],
+ rowFieldConverters[i].convert(rowData, i),
+ nullLiteral);
+ fieldTexts[i] = fieldText;
+ }
+ }
+
+ String result =
+ concatCsv(fieldTexts, delimiter, escapeChar, quoteChar);
+
+ return result.getBytes(Charset.forName(charset));
+ } catch (Throwable t) {
+ throw new RuntimeException(
+ String.format("Could not properly serialize csv.
Row=[%s].", rowData), t);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ CsvRowDataSerializationSchema that = (CsvRowDataSerializationSchema) o;
+ return rowFormatInfo.equals(that.rowFormatInfo) &&
+ charset.equals(that.charset) &&
+ delimiter.equals(that.delimiter) &&
+ Objects.equals(escapeChar, that.escapeChar) &&
+ Objects.equals(quoteChar, that.quoteChar) &&
+ Objects.equals(nullLiteral, that.nullLiteral);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), rowFormatInfo, charset,
delimiter, escapeChar, quoteChar, nullLiteral);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..a2c39e1c06
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.csv.CsvFormatFactory
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
new file mode 100644
index 0000000000..666340fdc9
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.csv;
+
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.FormatUtils;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * The unit test for csv format factory.
+ */
+public class CsvFormatFactoryTest extends TestLogger {
+
+ public RowFormatInfo testFormatInfo;
+
+ public TypeInformation<RowData> testTypeInformation;
+
+ public ResolvedSchema resolvedSchema;
+
+ public DataType dataType;
+
+ @Before
+ public void setup() {
+ resolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("student_name", DataTypes.STRING()),
+ Column.physical("score", DataTypes.FLOAT()),
+ Column.physical("date", DataTypes.DATE()));
+ dataType = resolvedSchema.toPhysicalRowDataType();
+ RowType rowType = (RowType) dataType.getLogicalType();
+ testTypeInformation = InternalTypeInfo.of(rowType);
+ testFormatInfo = new RowFormatInfo(
+ new String[]{"student_name", "score", "date"},
+ new FormatInfo[]{
+ StringFormatInfo.INSTANCE,
+ FloatFormatInfo.INSTANCE,
+ new DateFormatInfo("yyyy-MM-dd")
+ });
+ }
+
+ @Test
+ public void testDeSeSchema() {
+ final CsvRowDataDeserializationSchema expectedDeSer =
+ new CsvRowDataDeserializationSchema.Builder(
+ testFormatInfo, testTypeInformation)
+ .setCharset("UTF-8")
+ .setFieldDelimiter(';')
+ .setQuoteCharacter('\'')
+ .setEscapeCharacter('\\')
+ .setNullLiteral("n/a")
+ .build();
+ final Map<String, String> options = getAllOptions();
+ DeserializationSchema<RowData> actualDeser =
createDeserializationSchema(options);
+ assertEquals(expectedDeSer, actualDeser);
+
+ final CsvRowDataSerializationSchema expectedSer =
+ new CsvRowDataSerializationSchema.Builder(testFormatInfo)
+ .setCharset("UTF-8")
+ .setFieldDelimiter(';')
+ .setQuoteCharacter('\'')
+ .setEscapeCharacter('\\')
+ .setNullLiteral("n/a")
+ .build();
+ SerializationSchema<RowData> actualSer =
createSerializationSchema(options);
+ assertEquals(expectedSer, actualSer);
+ }
+
+ private Map<String, String> getAllOptions() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+
+ options.put("format", CsvFormatFactory.IDENTIFIER);
+ options.put("InLong-CSV.row.format.info",
FormatUtils.marshall(testFormatInfo));
+ options.put("InLong-CSV.format.field-delimiter", ";");
+ options.put("InLong-CSV.format.quote-character", "'");
+ options.put("InLong-CSV.format.escape-character", "\\");
+ options.put("InLong-CSV.format.null-literal", "n/a");
+ return options;
+ }
+
+ private DeserializationSchema<RowData> createDeserializationSchema(
+ Map<String, String> options) {
+ final DynamicTableSource actualSource =
createTableSource(resolvedSchema, options);
+ assert actualSource instanceof
TestDynamicTableFactory.DynamicTableSourceMock;
+ TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ return sourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, dataType);
+ }
+
+ private SerializationSchema<RowData> createSerializationSchema(
+ Map<String, String> options) {
+ final DynamicTableSink actualSink = createTableSink(resolvedSchema,
options);
+ assert actualSink instanceof
TestDynamicTableFactory.DynamicTableSinkMock;
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ return sinkMock.valueFormat.createRuntimeEncoder(null, dataType);
+ }
+
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
new file mode 100644
index 0000000000..6d0c848e57
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.csv;
+
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.BasicFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CsvRowDataDeserializationSchema}.
+ */
+public class CsvRowDataDeserializationSchemaTest extends TestLogger {
+
+ public RowFormatInfo testFormatInfo;
+
+ public TypeInformation<RowData> testTypeInformation;
+
+ public ResolvedSchema resolvedSchema;
+
+ public DataType dataType;
+
+ @Before
+ public void setup() {
+ resolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("f1", DataTypes.INT()),
+ Column.physical("f2", DataTypes.STRING()),
+ Column.physical("f3", DataTypes.STRING()),
+ Column.physical("f4", DataTypes.STRING()));
+ dataType = resolvedSchema.toPhysicalRowDataType();
+ RowType rowType = (RowType) dataType.getLogicalType();
+ testTypeInformation = InternalTypeInfo.of(rowType);
+ testFormatInfo = new RowFormatInfo(
+ new String[]{"f1", "f2", "f3", "f4"},
+ new FormatInfo[]{
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+
+ Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
+ };
+
+ testBasicDeserialization(config, StringFormatInfo.INSTANCE,
StringData.fromString("hello"), "hello");
+ testBasicDeserialization(config, BooleanFormatInfo.INSTANCE, true,
"true");
+ testBasicDeserialization(config, ByteFormatInfo.INSTANCE, (byte) 124,
"124");
+ testBasicDeserialization(config, ShortFormatInfo.INSTANCE, (short)
10000, "10000");
+ testBasicDeserialization(config, IntFormatInfo.INSTANCE, 1234567,
"1234567");
+ testBasicDeserialization(config, LongFormatInfo.INSTANCE,
12345678910L, "12345678910");
+ testBasicDeserialization(config, FloatFormatInfo.INSTANCE,
0.33333334f, "0.33333334");
+ testBasicDeserialization(config, DoubleFormatInfo.INSTANCE,
0.33333333332, "0.33333333332");
+ testBasicDeserialization(config, DecimalFormatInfo.INSTANCE,
+ DecimalData.fromBigDecimal(new
BigDecimal("1234.0000000000000000000000001"), 10, 0),
+ "1234.0000000000000000000000001");
+ testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
+ Date.valueOf("2020-03-22").toLocalDate().toEpochDay(),
"22/03/2020");
+ testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"),
+ Time.valueOf("11:12:13").toLocalTime().toSecondOfDay() * 1000,
"13/12/11");
+ testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"),
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22
11:12:13")), "22/03/2020 11:12:13");
+ }
+
+ @Test
+ public void testNullLiteral() throws Exception {
+ String nullLiteral = "n/a";
+
+ Consumer<CsvRowDataDeserializationSchema.Builder> config =
+ builder -> builder.setNullLiteral(nullLiteral);
+
+ testBasicDeserialization(config, StringFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, BooleanFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, ByteFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, ShortFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, IntFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, LongFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, FloatFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
null, nullLiteral);
+ testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), null,
nullLiteral);
+ testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"), null, nullLiteral);
+ }
+
+ @Test
+ public void testDelimiter() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config =
+ builder -> builder.setFieldDelimiter('|');
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, 10);
+ rowData.setField(1, StringData.fromString("field1"));
+ rowData.setField(2, StringData.fromString("field2"));
+ rowData.setField(3, StringData.fromString("field3"));
+
+ testRowDataDeserialization(
+ config,
+ rowData,
+ "10|field1|field2|field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testEscape() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config =
+ builder ->
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+ GenericRowData rowData1 = new GenericRowData(4);
+ rowData1.setField(0, 10);
+ rowData1.setField(1, StringData.fromString("field1,field2"));
+ rowData1.setField(2, StringData.fromString("field3"));
+ rowData1.setField(3, StringData.fromString("field4"));
+ testRowDataDeserialization(
+ config,
+ rowData1,
+ "10,field1\\,field2,field3,field4".getBytes(),
+ false);
+
+ GenericRowData rowData2 = new GenericRowData(4);
+ rowData2.setField(0, 10);
+ rowData2.setField(1, StringData.fromString("field1\\"));
+ rowData2.setField(2, StringData.fromString("field2"));
+ rowData2.setField(3, StringData.fromString("field3"));
+ testRowDataDeserialization(
+ config,
+ rowData2,
+ "10,field1\\\\,field2,field3".getBytes(),
+ false);
+
+ GenericRowData rowData3 = new GenericRowData(4);
+ rowData3.setField(0, 10);
+ rowData3.setField(1, StringData.fromString("field1\""));
+ rowData3.setField(2, StringData.fromString("field2"));
+ rowData3.setField(3, StringData.fromString("field3"));
+ testRowDataDeserialization(
+ config,
+ rowData3,
+ "10,field1\\\",field2,field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testQuote() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config =
+ builder ->
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+ GenericRowData rowData1 = new GenericRowData(4);
+ rowData1.setField(0, 10);
+ rowData1.setField(1, StringData.fromString("field1,field2"));
+ rowData1.setField(2, StringData.fromString("field3"));
+ rowData1.setField(3, StringData.fromString("field4"));
+ testRowDataDeserialization(
+ config,
+ rowData1,
+ "10,\"field1,field2\",field3,field4".getBytes(),
+ false);
+
+ GenericRowData rowData2 = new GenericRowData(4);
+ rowData2.setField(0, 10);
+ rowData2.setField(1, StringData.fromString("field1\\"));
+ rowData2.setField(2, StringData.fromString("field2"));
+ rowData2.setField(3, StringData.fromString("field3"));
+ testRowDataDeserialization(
+ config,
+ rowData2,
+ "10,\"field1\\\",field2,field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testCharset() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config =
+ builder -> builder.setCharset(StandardCharsets.UTF_16.name());
+
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, 10);
+ rowData.setField(1, StringData.fromString("field1"));
+ rowData.setField(2, StringData.fromString("field2"));
+ rowData.setField(3, StringData.fromString("field3"));
+ testRowDataDeserialization(
+ config,
+ rowData,
+ "10,field1,field2,field3".getBytes(StandardCharsets.UTF_16),
+ false);
+ }
+
+ @Test
+ public void testMoreFields() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
+ };
+
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, 1);
+ rowData.setField(1, StringData.fromString("field1"));
+ rowData.setField(2, StringData.fromString("field2"));
+ rowData.setField(3, StringData.fromString("field3"));
+ testRowDataDeserialization(
+ config,
+ rowData,
+ "1,field1,field2,field3,field4".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testLessFields() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
+ };
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, 1);
+ rowData.setField(1, StringData.fromString("field1"));
+ rowData.setField(2, StringData.fromString("field2"));
+ rowData.setField(3, null);
+ testRowDataDeserialization(
+ config,
+ rowData,
+ "1,field1,field2".getBytes(),
+ false);
+ }
+
+ @Test(expected = Exception.class)
+ public void testErrors() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
+ };
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, null);
+ rowData.setField(1, StringData.fromString("field1"));
+ rowData.setField(2, StringData.fromString("field2"));
+ rowData.setField(3, StringData.fromString("field3"));
+ testRowDataDeserialization(
+ config,
+ rowData,
+ "na,field1,field2,field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testIgnoreErrors() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
+ };
+ testRowDataDeserialization(
+ config,
+ null,
+ "na,field1,field2,field3".getBytes(),
+ true);
+ }
+
+ private <T> void testBasicDeserialization(
+ Consumer<CsvRowDataDeserializationSchema.Builder> config,
+ BasicFormatInfo<T> basicFormatInfo,
+ Object expectedRecord,
+ String text) throws IOException {
+ LogicalType logicalType =
TableFormatUtils.deriveLogicalType(basicFormatInfo);
+ TypeInformation<RowData> typeInformation =
InternalTypeInfo.of(logicalType);
+ RowFormatInfo rowFormatInfo =
+ new RowFormatInfo(
+ new String[]{"f"},
+ new FormatInfo[]{basicFormatInfo});
+
+ CsvRowDataDeserializationSchema.Builder builder =
+ new CsvRowDataDeserializationSchema.Builder(rowFormatInfo,
typeInformation);
+ config.accept(builder);
+
+ CsvRowDataDeserializationSchema deserializer = builder.build();
+
+ GenericRowData row = (GenericRowData)
deserializer.deserialize(text.getBytes());
+ assertEquals(1, row.getArity());
+ assertEquals(expectedRecord, row.getField(0));
+ }
+
+ private void testRowDataDeserialization(
+ Consumer<CsvRowDataDeserializationSchema.Builder> config,
+ RowData expectedRow,
+ byte[] bytes,
+ boolean ignoreErrors) throws Exception {
+ CsvRowDataDeserializationSchema.Builder builder =
+ new CsvRowDataDeserializationSchema.Builder(testFormatInfo,
testTypeInformation);
+ builder.setIgnoreErrors(ignoreErrors);
+ config.accept(builder);
+
+ CsvRowDataDeserializationSchema deserializer = builder.build();
+ GenericRowData rowData = (GenericRowData)
deserializer.deserialize(bytes);
+ assertEquals(expectedRow, rowData);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataSerializationSchemaTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataSerializationSchemaTest.java
new file mode 100644
index 0000000000..d97a8a8fe7
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataSerializationSchemaTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.csv;
+
+import org.apache.inlong.sort.formats.common.BasicFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
+import static org.apache.flink.table.data.StringData.fromString;
+import static org.apache.flink.table.data.TimestampData.fromTimestamp;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CsvRowDataSerializationSchema}.
+ */
+public class CsvRowDataSerializationSchemaTest {
+
+ public RowFormatInfo testFormatInfo;
+
+ public TypeInformation<RowData> testTypeInformation;
+
+ public ResolvedSchema resolvedSchema;
+
+ public DataType dataType;
+
+ @Before
+ public void setup() {
+ resolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("f1", DataTypes.INT()),
+ Column.physical("f2", DataTypes.STRING()),
+ Column.physical("f3", DataTypes.STRING()),
+ Column.physical("f4", DataTypes.STRING()));
+ dataType = resolvedSchema.toPhysicalRowDataType();
+ RowType rowType = (RowType) dataType.getLogicalType();
+ testTypeInformation = InternalTypeInfo.of(rowType);
+ testFormatInfo = new RowFormatInfo(
+ new String[]{"f1", "f2", "f3", "f4"},
+ new FormatInfo[]{
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+ }
+
+ @Test
+ public void testNormal() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config = builder -> {
+ };
+
+ testBasicSerialization(config, StringFormatInfo.INSTANCE, "hello",
"hello");
+ testBasicSerialization(config, BooleanFormatInfo.INSTANCE, true,
"true");
+ testBasicSerialization(config, ByteFormatInfo.INSTANCE, (byte) 124,
"124");
+ testBasicSerialization(config, ShortFormatInfo.INSTANCE, (short)
10000, "10000");
+ testBasicSerialization(config, IntFormatInfo.INSTANCE, 1234567,
"1234567");
+ testBasicSerialization(config, LongFormatInfo.INSTANCE, 12345678910L,
"12345678910");
+ testBasicSerialization(config, FloatFormatInfo.INSTANCE, 0.33333334f,
"0.33333334");
+ testBasicSerialization(config, DoubleFormatInfo.INSTANCE,
0.33333333332, "0.33333333332");
+ testBasicSerialization(config, DecimalFormatInfo.INSTANCE, new
BigDecimal("1234.0000000000000000000000001"),
+ "1234.0000000000000000000000001");
+ testBasicSerialization(config, new DateFormatInfo("dd/MM/yyyy"),
Date.valueOf("2020-03-22"), "22/03/2020");
+ testBasicSerialization(config, new TimeFormatInfo("ss/mm/hh"),
Time.valueOf("11:12:13"), "13/12/11");
+ testBasicSerialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"),
+ Timestamp.valueOf("2020-03-22 11:12:13"), "22/03/2020
11:12:13");
+ }
+
+ @Test
+ public void testNullLiteral() {
+ String nullLiteral = "n/a";
+
+ Consumer<CsvRowDataSerializationSchema.Builder> config =
+ builder -> builder.setNullLiteral(nullLiteral);
+
+ testBasicSerialization(config, StringFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, BooleanFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, ByteFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, ShortFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, IntFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, LongFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, FloatFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, DoubleFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, DecimalFormatInfo.INSTANCE, null,
nullLiteral);
+ testBasicSerialization(config, new DateFormatInfo("dd/MM/yyyy"), null,
nullLiteral);
+ testBasicSerialization(config, new TimeFormatInfo("ss/mm/hh"), null,
nullLiteral);
+ testBasicSerialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"), null, nullLiteral);
+ }
+
+ @Test
+ public void testDelimiter() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config =
+ builder -> builder.setFieldDelimiter('|');
+ GenericRowData rowData = GenericRowData.of(
+ 10,
+ fromString("field1"),
+ fromString("field2"),
+ fromString("field3"));
+
+ testRowDataSerialization(
+ config,
+ rowData,
+ "10|field1|field2|field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testEscape() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config =
+ builder ->
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+ GenericRowData rowData1 = GenericRowData.of(
+ 10,
+ fromString("field1,field2"),
+ fromString("field3"),
+ fromString("field4"));
+ testRowDataSerialization(
+ config,
+ rowData1,
+ "10,field1\\,field2,field3,field4".getBytes(),
+ false);
+
+ GenericRowData rowData2 = GenericRowData.of(
+ 10,
+ fromString("field1\\"),
+ fromString("field2"),
+ fromString("field3"));
+ testRowDataSerialization(
+ config,
+ rowData2,
+ "10,field1\\\\,field2,field3".getBytes(),
+ false);
+
+ GenericRowData rowData3 = GenericRowData.of(
+ 10,
+ fromString("field1\""),
+ fromString("field2"),
+ fromString("field3"));
+ testRowDataSerialization(
+ config,
+ rowData3,
+ "10,field1\\\",field2,field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testQuote() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config =
+ builder -> builder.setQuoteCharacter('\"');
+
+ GenericRowData rowData = GenericRowData.of(
+ 10,
+ fromString("field1,field2"),
+ fromString("field3"),
+ fromString("field4"));
+ testRowDataSerialization(
+ config,
+ rowData,
+ "10,field1\",\"field2,field3,field4".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testCharset() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config =
+ builder -> builder.setCharset(StandardCharsets.UTF_16.name());
+
+ GenericRowData rowData = GenericRowData.of(
+ 10,
+ fromString("field1"),
+ fromString("field2"),
+ fromString("field3"));
+ testRowDataSerialization(
+ config,
+ rowData,
+ "10,field1,field2,field3".getBytes(StandardCharsets.UTF_16),
+ false);
+ }
+
+ @Test
+ public void testMoreFields() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config = builder -> {
+ };
+
+ GenericRowData rowData = GenericRowData.of(
+ 10,
+ fromString("field1"),
+ fromString("field2"),
+ fromString("field3"),
+ fromString("field4")
+
+ );
+ testRowDataSerialization(
+ config,
+ rowData,
+ "10,field1,field2,field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testLessFields() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config = builder -> {
+ };
+
+ GenericRowData rowData = GenericRowData.of(
+ 10,
+ fromString("field1"),
+ fromString("field2")
+
+ );
+ testRowDataSerialization(
+ config,
+ rowData,
+ "10,field1,field2,".getBytes(),
+ false);
+ }
+
+ @Test(expected = Exception.class)
+ public void testErrors() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config = builder -> {
+ };
+ GenericRowData rowData = GenericRowData.of(
+ fromString("na"),
+ fromString("field1"),
+ fromString("field2"),
+ fromString("field3")
+
+ );
+ testRowDataSerialization(
+ config,
+ rowData,
+ ",field1,field2,field3".getBytes(),
+ false);
+ }
+
+ @Test
+ public void testIngoreErrors() {
+ Consumer<CsvRowDataSerializationSchema.Builder> config = builder -> {
+ };
+ GenericRowData rowData = GenericRowData.of(
+ fromString("na"),
+ fromString("field1"),
+ fromString("field2"),
+ fromString("field3")
+
+ );
+ testRowDataSerialization(
+ config,
+ rowData,
+ null,
+ true);
+ }
+
+ private <T> void testBasicSerialization(
+ Consumer<CsvRowDataSerializationSchema.Builder> config,
+ BasicFormatInfo<T> basicFormatInfo,
+ T record,
+ String expectedText) {
+ RowFormatInfo rowFormatInfo =
+ new RowFormatInfo(
+ new String[]{"f"},
+ new FormatInfo[]{basicFormatInfo});
+
+ CsvRowDataSerializationSchema.Builder builder =
+ new CsvRowDataSerializationSchema.Builder(rowFormatInfo);
+ config.accept(builder);
+
+ CsvRowDataSerializationSchema serializer = builder.build();
+
+ GenericRowData rowData = new GenericRowData(1);
+ if (record instanceof String) {
+ rowData.setField(0, fromString((String) record));
+ } else if (record instanceof BigDecimal) {
+ rowData.setField(0, fromBigDecimal((BigDecimal) record, 30, 25));
+ } else if (record instanceof Timestamp) {
+ rowData.setField(0, fromTimestamp((Timestamp) record));
+ } else if (record instanceof Date) {
+ rowData.setField(0, ((Date) record).toLocalDate().toEpochDay());
+ } else if (record instanceof Time) {
+ rowData.setField(0, ((Time) record).toLocalTime().toSecondOfDay()
* 1000);
+ } else {
+ rowData.setField(0, record);
+ }
+ String text = new String(serializer.serialize(rowData));
+ assertEquals(expectedText, text);
+ }
+
+ private void testRowDataSerialization(
+ Consumer<CsvRowDataSerializationSchema.Builder> config,
+ RowData rowData,
+ byte[] expectedBytes,
+ boolean ignoreErrors) {
+ CsvRowDataSerializationSchema.Builder builder =
+ new CsvRowDataSerializationSchema.Builder(testFormatInfo);
+ builder.setIgnoreErrors(ignoreErrors);
+ config.accept(builder);
+
+ CsvRowDataSerializationSchema serializer = builder.build();
+
+ byte[] bytes = serializer.serialize(rowData);
+ assertArrayEquals(expectedBytes, bytes);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/resources/log4j-test.properties
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..881dc0609b
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n