This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f9429624 [INLONG-9569][Sort] Support rowdata way of sort InLong 
message csv format (#9646)
09f9429624 is described below

commit 09f94296242121f73713c9631fa717409c28c4ff
Author: baomingyu <[email protected]>
AuthorDate: Sun Feb 18 19:27:17 2024 +0800

    [INLONG-9569][Sort] Support rowdata way of sort InLong message csv format 
(#9646)
---
 .../pom.xml                                        |  65 +-
 .../inlongmsgcsv/InLongMsgCsvDecodingFormat.java   | 119 +++
 .../InLongMsgCsvFormatDeserializer.java            | 385 +++++++++
 .../inlongmsgcsv/InLongMsgCsvFormatFactory.java    | 136 ++++
 .../InLongMsgCsvRowDataDeserializationSchema.java  | 153 ++++
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    | 194 +++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../InLongMsgCsvFormatDeserializerTest.java        | 861 +++++++++++++++++++++
 .../InLongMsgCsvFormatFactoryTest.java             | 127 +++
 .../src/test/resources/log4j-test.properties       |  27 +
 .../format-rowdata/format-rowdata-csv/pom.xml      |  48 --
 .../format-rowdata/format-rowdata-json/pom.xml     |  48 --
 .../format-rowdata/format-rowdata-kv/pom.xml       |  48 --
 inlong-sort/sort-formats/format-rowdata/pom.xml    |   1 +
 14 files changed, 2023 insertions(+), 205 deletions(-)

diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/pom.xml
similarity index 56%
copy from inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
copy to 
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/pom.xml
index d3f477164b..8bf150aa72 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/pom.xml
@@ -26,21 +26,18 @@
         <version>1.12.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>sort-format-rowdata-csv</artifactId>
+    <artifactId>sort-format-inlongmsg-rowdata-csv</artifactId>
     <packaging>jar</packaging>
-    <name>Apache InLong - Sort Format-RowData-CSV</name>
+    <name>Apache InLong - Sort Format-InLongMsg-RowData-CSV</name>
 
     <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <maven.compiler.source>1.8</maven.compiler.source>
-        <maven.compiler.target>1.8</maven.compiler.target>
         
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-rowdata-base</artifactId>
+            <artifactId>sort-format-inlongmsg-rowdata-base</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
@@ -49,14 +46,8 @@
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-shaded-jackson</artifactId>
-            <version>${flink.jackson.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <!-- test dependencies -->
 
+        <!-- test dependencies -->
         <!-- CSV table descriptor testing -->
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -96,52 +87,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-    <profiles>
-        <profile>
-            <id>japicmp-report</id>
-            <activation>
-                <property>
-                    <name>japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                
<breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications>
-                                
<breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
-            <id>japicmp-check</id>
-            <activation>
-                <property>
-                    <name>!japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                <excludes>
-                                    
<exclude>org.apache.inlong.sort.flink.formats.csv.CsvRowDataDeserializationSchema</exclude>
-                                    
<exclude>org.apache.inlong.sort.flink.formats.csv.CsvRowDataSerializationSchema</exclude>
-                                </excludes>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java
new file mode 100644
index 0000000000..7898367647
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDecodingFormat;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ATTRIBUTE_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CHARSET;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.DELETE_HEAD_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.FIELD_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.LINE_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIELD_NAME;
+
+/**
+ * InLongMsgCsvDecodingFormat.
+ */
+public class InLongMsgCsvDecodingFormat extends 
AbstractInLongMsgDecodingFormat {
+
+    private final ReadableConfig formatOptions;
+
+    public InLongMsgCsvDecodingFormat(ReadableConfig formatOptions) {
+        this.formatOptions = formatOptions;
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType dataType) {
+        InLongMsgCsvRowDataDeserializationSchema.Builder builder =
+                new InLongMsgCsvRowDataDeserializationSchema.Builder(
+                        
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)));
+        configureDeserializationSchema(formatOptions, builder);
+        return builder.build();
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    private void configureDeserializationSchema(
+            ReadableConfig formatOptions,
+            InLongMsgCsvRowDataDeserializationSchema.Builder schemaBuilder) {
+        
schemaBuilder.setCharset(formatOptions.getOptional(CHARSET).orElse(CHARSET.defaultValue()))
+                .setMetadataKeys(metadataKeys);
+
+        formatOptions
+                .getOptional(TIME_FIELD_NAME)
+                .ifPresent(schemaBuilder::setTimeFieldName);
+
+        formatOptions
+                .getOptional(ATTRIBUTE_FIELD_NAME)
+                .ifPresent(schemaBuilder::setAttributesFieldName);
+
+        formatOptions
+                .getOptional(CHARSET)
+                .ifPresent(schemaBuilder::setCharset);
+
+        formatOptions
+                .getOptional(LINE_DELIMITER)
+                .map(delimiter -> 
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+                .ifPresent(schemaBuilder::setLineDelimiter);
+
+        formatOptions
+                .getOptional(FIELD_DELIMITER)
+                .map(delimiter -> 
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+                .ifPresent(schemaBuilder::setFieldDelimiter);
+
+        formatOptions
+                .getOptional(QUOTE_CHARACTER)
+                .map(quote -> quote.charAt(0))
+                .ifPresent(schemaBuilder::setQuoteCharacter);
+
+        formatOptions
+                .getOptional(ESCAPE_CHARACTER)
+                .map(escape -> escape.charAt(0))
+                .ifPresent(schemaBuilder::setEscapeCharacter);
+
+        formatOptions
+                .getOptional(NULL_LITERAL)
+                .ifPresent(schemaBuilder::setNullLiteral);
+
+        formatOptions
+                .getOptional(IGNORE_ERRORS)
+                .ifPresent(schemaBuilder::setIgnoreErrors);
+
+        formatOptions
+                .getOptional(DELETE_HEAD_DELIMITER)
+                .ifPresent(schemaBuilder::setDeleteHeadDelimiter);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
new file mode 100644
index 0000000000..60417f9ce6
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.base.TextFormatBuilder;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
+
+/**
+ * The deserializer for the records in InLongMsgCsv format.
+ */
+public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgFormatDeserializer {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Format information describing the result type.
+     */
+    @Nonnull
+    private final RowFormatInfo rowFormatInfo;
+
+    /**
+     * The name of the time field.
+     */
+    @Nullable
+    private final String timeFieldName;
+
+    /**
+     * The name of the attributes field.
+     */
+    @Nullable
+    private final String attributesFieldName;
+
+    /**
+     * The charset of the text.
+     */
+    @Nonnull
+    private final String charset;
+
+    /**
+     * The delimiter between fields.
+     */
+    @Nonnull
+    private final Character delimiter;
+
+    /**
+     * The delimiter between lines.
+     */
+    @Nullable
+    private final Character lineDelimiter;
+
+    /**
+     * Escape character. Null if escaping is disabled.
+     */
+    @Nullable
+    private final Character escapeChar;
+
+    /**
+     * Quote character. Null if quoting is disabled.
+     */
+    @Nullable
+    private final Character quoteChar;
+
+    /**
+     * The literal represented null values, default "".
+     */
+    @Nullable
+    private final String nullLiteral;
+
+    /**
+     * True if the head delimiter should be removed.
+     */
+    private final boolean deleteHeadDelimiter;
+
+    private final List<String> metadataKeys;
+
+    private final FieldToRowDataConverter[] converters;
+
+    /**
+     * True if the predefinedField existed, default true.
+     */
+    private boolean retainPredefinedField = true;
+
+    public InLongMsgCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character lineDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            boolean deleteHeadDelimiter,
+            List<String> metadataKeys,
+            boolean ignoreErrors,
+            boolean retainPredefinedField) {
+        this(
+                rowFormatInfo,
+                timeFieldName,
+                attributesFieldName,
+                charset,
+                delimiter,
+                lineDelimiter,
+                escapeChar,
+                quoteChar,
+                nullLiteral,
+                deleteHeadDelimiter,
+                metadataKeys,
+                InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+        this.retainPredefinedField = retainPredefinedField;
+    }
+
+    public InLongMsgCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character lineDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            boolean deleteHeadDelimiter,
+            List<String> metadataKeys,
+            boolean ignoreErrors) {
+        this(
+                rowFormatInfo,
+                timeFieldName,
+                attributesFieldName,
+                charset,
+                delimiter,
+                lineDelimiter,
+                escapeChar,
+                quoteChar,
+                nullLiteral,
+                deleteHeadDelimiter,
+                metadataKeys,
+                InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+    }
+
+    public InLongMsgCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character lineDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            boolean deleteHeadDelimiter,
+            List<String> metadataKeys,
+            @Nonnull FailureHandler failureHandler) {
+        super(failureHandler);
+
+        this.rowFormatInfo = rowFormatInfo;
+        this.timeFieldName = timeFieldName;
+        this.attributesFieldName = attributesFieldName;
+        this.delimiter = delimiter;
+        this.lineDelimiter = lineDelimiter;
+        this.charset = charset;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+        this.nullLiteral = nullLiteral;
+        this.deleteHeadDelimiter = deleteHeadDelimiter;
+        this.metadataKeys = metadataKeys;
+
+        converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+                .map(formatInfo -> FieldToRowDataConverters.createConverter(
+                        TableFormatUtils.deriveLogicalType(formatInfo)))
+                .toArray(FieldToRowDataConverter[]::new);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InLongMsgUtils.decorateRowTypeWithNeededHeadFieldsAndMetadata(
+                timeFieldName,
+                attributesFieldName,
+                rowFormatInfo,
+                metadataKeys);
+    }
+
+    @Override
+    protected InLongMsgHead parseHead(String attr) {
+        return InLongMsgCsvUtils.parseHead(attr);
+    }
+
+    @Override
+    protected List<InLongMsgBody> parseBodyList(byte[] bytes) {
+        return InLongMsgCsvUtils.parseBodyList(
+                bytes,
+                charset,
+                delimiter,
+                lineDelimiter,
+                escapeChar,
+                quoteChar,
+                deleteHeadDelimiter);
+    }
+
+    @Override
+    protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) {
+        GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
+                rowFormatInfo,
+                nullLiteral,
+                retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
+                body.getFields(),
+                converters);
+
+        // Decorate result with time and attributes fields if needed
+        genericRowData = InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                head.getTime(),
+                head.getAttributes(),
+                genericRowData);
+
+        // Decorate result with metadata if needed
+        return 
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
 head, metadataKeys));
+    }
+
+    /**
+     * The builder for {@link InLongMsgCsvFormatDeserializer}.
+     */
+    public static class Builder extends TextFormatBuilder<Builder> {
+
+        private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+        private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+        private Character delimiter = DEFAULT_DELIMITER;
+        private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+        private Boolean deleteHeadDelimiter = DEFAULT_DELETE_HEAD_DELIMITER;
+        private List<String> metadataKeys = Collections.emptyList();
+        private Boolean retainPredefinedField = 
DEFAULT_RETAIN_PREDEFINED_FIELD;
+
+        public Builder(RowFormatInfo rowFormatInfo) {
+            super(rowFormatInfo);
+        }
+
+        public Builder setTimeFieldName(String timeFieldName) {
+            this.timeFieldName = timeFieldName;
+            return this;
+        }
+
+        public Builder setAttributesFieldName(String attributesFieldName) {
+            this.attributesFieldName = attributesFieldName;
+            return this;
+        }
+
+        public Builder setDelimiter(Character delimiter) {
+            this.delimiter = delimiter;
+            return this;
+        }
+
+        public Builder setLineDelimiter(Character lineDelimiter) {
+            this.lineDelimiter = lineDelimiter;
+            return this;
+        }
+
+        public Builder setDeleteHeadDelimiter(Boolean deleteHeadDelimiter) {
+            this.deleteHeadDelimiter = deleteHeadDelimiter;
+            return this;
+        }
+
+        public Builder setMetadataKeys(List<String> metadataKeys) {
+            this.metadataKeys = metadataKeys;
+            return this;
+        }
+
+        public Builder setRetainPredefinedField(Boolean retainPredefinedField) 
{
+            this.retainPredefinedField = retainPredefinedField;
+            return this;
+        }
+
+        public InLongMsgCsvFormatDeserializer build() {
+            return new InLongMsgCsvFormatDeserializer(
+                    rowFormatInfo,
+                    timeFieldName,
+                    attributesFieldName,
+                    charset,
+                    delimiter,
+                    lineDelimiter,
+                    escapeChar,
+                    quoteChar,
+                    nullLiteral,
+                    deleteHeadDelimiter,
+                    metadataKeys,
+                    ignoreErrors,
+                    retainPredefinedField);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        InLongMsgCsvFormatDeserializer that = (InLongMsgCsvFormatDeserializer) 
o;
+        return deleteHeadDelimiter == that.deleteHeadDelimiter &&
+                rowFormatInfo.equals(that.rowFormatInfo) &&
+                Objects.equals(timeFieldName, that.timeFieldName) &&
+                Objects.equals(attributesFieldName, that.attributesFieldName) 
&&
+                charset.equals(that.charset) &&
+                delimiter.equals(that.delimiter) &&
+                Objects.equals(lineDelimiter, that.lineDelimiter) &&
+                Objects.equals(escapeChar, that.escapeChar) &&
+                Objects.equals(quoteChar, that.quoteChar) &&
+                Objects.equals(nullLiteral, that.nullLiteral) &&
+                Objects.equals(metadataKeys, that.metadataKeys);
+
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
+                attributesFieldName, charset, delimiter, lineDelimiter, 
escapeChar, quoteChar,
+                nullLiteral, deleteHeadDelimiter, metadataKeys);
+    }
+
+    @Override
+    public String toString() {
+        return "InLongMsgCsvFormatDeserializer{" +
+                "rowFormatInfo=" + rowFormatInfo +
+                ", timeFieldName='" + timeFieldName + '\'' +
+                ", attributesFieldName='" + attributesFieldName + '\'' +
+                ", charset='" + charset + '\'' +
+                ", delimiter=" + delimiter +
+                ", lineDelimiter=" + lineDelimiter +
+                ", escapeChar=" + escapeChar +
+                ", quoteChar=" + quoteChar +
+                ", nullLiteral='" + nullLiteral + '\'' +
+                ", deleteHeadDelimiter=" + deleteHeadDelimiter +
+                ", metadataKeys=" + metadataKeys +
+                '}';
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
new file mode 100644
index 0000000000..3800f5440f
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ATTRIBUTE_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CHARSET;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.DELETE_HEAD_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.FIELD_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.LINE_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIELD_NAME;
+
+/**
+ * Table format factory for providing configured instances of 
InLongMsgCsv-to-rowdata
+ * serializer and deserializer.
+ */
+public final class InLongMsgCsvFormatFactory implements 
DeserializationFormatFactory {
+
+    public static final String IDENTIFIER = "inlong-msg-csv";
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        validateFormatOptions(formatOptions);
+
+        return new InLongMsgCsvDecodingFormat(formatOptions);
+    }
+
+    // ------------------------------------------------------------------------
+    // Validation
+    // ------------------------------------------------------------------------
+
+    static void validateFormatOptions(ReadableConfig tableOptions) {
+        // Validate the option value must be a single char.
+        validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
+        validateCharacterVal(tableOptions, LINE_DELIMITER, true);
+        validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+        validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+    }
+
+    /**
+     * Validates the option {@code option} value must be a Character.
+     */
+    private static void validateCharacterVal(
+            ReadableConfig tableOptions, ConfigOption<String> option) {
+        validateCharacterVal(tableOptions, option, false);
+    }
+
+    /**
+     * Validates the option {@code option} value must be a Character.
+     *
+     * @param tableOptions the table options
+     * @param option       the config option
+     * @param unescape     whether to unescape the option value
+     */
+    private static void validateCharacterVal(
+            ReadableConfig tableOptions, ConfigOption<String> option, boolean 
unescape) {
+        if (tableOptions.getOptional(option).isPresent()) {
+            final String value =
+                    unescape
+                            ? 
StringEscapeUtils.unescapeJava(tableOptions.get(option))
+                            : tableOptions.get(option);
+            if (value.length() != 1) {
+                throw new ValidationException(
+                        String.format(
+                                "Option '%s.%s' must be a string with single 
character, but was: %s",
+                                IDENTIFIER, option.key(), 
tableOptions.get(option)));
+            }
+        }
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Stream.of(ROW_FORMAT_INFO).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(TIME_FIELD_NAME);
+        options.add(ATTRIBUTE_FIELD_NAME);
+        options.add(CHARSET);
+        options.add(FIELD_DELIMITER);
+        options.add(LINE_DELIMITER);
+        options.add(QUOTE_CHARACTER);
+        options.add(ESCAPE_CHARACTER);
+        options.add(NULL_LITERAL);
+        options.add(IGNORE_ERRORS);
+        options.add(DELETE_HEAD_DELIMITER);
+        options.add(RETAIN_PREDEFINED_FIELD);
+        return options;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java
new file mode 100644
index 0000000000..5c021e8be9
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+
+/**
+ * Deserialization schema from InLongMsg-CSV to Flink Table & SQL internal 
data structures.
+ */
+public class InLongMsgCsvRowDataDeserializationSchema extends 
AbstractInLongMsgDeserializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    public 
InLongMsgCsvRowDataDeserializationSchema(AbstractInLongMsgFormatDeserializer 
formatDeserializer) {
+        super(formatDeserializer);
+    }
+
+    /**
+     * A builder for creating a {@link 
InLongMsgCsvRowDataDeserializationSchema}.
+     */
+    @PublicEvolving
+    public static class Builder {
+
+        private final RowFormatInfo rowFormatInfo;
+
+        private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+        private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+        private String charset = DEFAULT_CHARSET;
+        private Character fieldDelimiter = DEFAULT_DELIMITER;
+        private Character escapeChar = DEFAULT_ESCAPE_CHARACTER;
+        private Character quoteChar = DEFAULT_QUOTE_CHARACTER;
+        private String nullLiteral = DEFAULT_NULL_LITERAL;
+        private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+        private boolean deleteHeadDelimiter;
+        private boolean retainPredefinedField;
+        private boolean ignoreErrors = false;
+        private List<String> metadataKeys = Collections.emptyList();
+
+        protected Builder(RowFormatInfo rowFormatInfo) {
+            this.rowFormatInfo = rowFormatInfo;
+        }
+
+        public Builder setTimeFieldName(String timeFieldName) {
+            this.timeFieldName = timeFieldName;
+            return this;
+        }
+
+        public Builder setAttributesFieldName(String attributesFieldName) {
+            this.attributesFieldName = attributesFieldName;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(char fieldDelimiter) {
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder setCharset(String charset) {
+            this.charset = charset;
+            return this;
+        }
+
+        public Builder setEscapeCharacter(char escapeChar) {
+            this.escapeChar = escapeChar;
+            return this;
+        }
+
+        public Builder setQuoteCharacter(char quoteChar) {
+            this.quoteChar = quoteChar;
+            return this;
+        }
+
+        public Builder setNullLiteral(String nullLiteral) {
+            this.nullLiteral = nullLiteral;
+            return this;
+        }
+
+        public Builder setDeleteHeadDelimiter(boolean deleteHeadDelimiter) {
+            this.deleteHeadDelimiter = deleteHeadDelimiter;
+            return this;
+        }
+
+        public Builder setRetainPredefinedField(boolean retainPredefinedField) 
{
+            this.retainPredefinedField = retainPredefinedField;
+            return this;
+        }
+
+        public Builder setLineDelimiter(char lineDelimiter) {
+            this.lineDelimiter = lineDelimiter;
+            return this;
+        }
+
+        public Builder setIgnoreErrors(boolean ignoreErrors) {
+            this.ignoreErrors = ignoreErrors;
+            return this;
+        }
+
+        public Builder setMetadataKeys(List<String> metadataKeys) {
+            this.metadataKeys = metadataKeys;
+            return this;
+        }
+
+        public InLongMsgCsvRowDataDeserializationSchema build() {
+            AbstractInLongMsgFormatDeserializer formatDeserializer = new 
InLongMsgCsvFormatDeserializer(
+                    rowFormatInfo,
+                    timeFieldName,
+                    attributesFieldName,
+                    charset,
+                    fieldDelimiter,
+                    lineDelimiter,
+                    escapeChar,
+                    quoteChar,
+                    nullLiteral,
+                    deleteHeadDelimiter,
+                    metadataKeys,
+                    ignoreErrors,
+                    retainPredefinedField);
+
+            return new 
InLongMsgCsvRowDataDeserializationSchema(formatDeserializer);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
new file mode 100644
index 0000000000..24883a65bd
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAMID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
+
+/**
+ * Utilities for InLongMsgCSV.
+ */
+public class InLongMsgCsvUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InLongMsgCsvUtils.class);
+
+    public static final String FORMAT_DELETE_HEAD_DELIMITER = 
"format.delete-head-delimiter";
+    public static final boolean DEFAULT_DELETE_HEAD_DELIMITER = true;
+
+    public static InLongMsgHead parseHead(String attr) {
+        Map<String, String> attributes = parseAttr(attr);
+
+        // Extracts interface from the attributes.
+        String streamId;
+
+        if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_NAME)) {
+            streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_ID)) {
+            streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_ID);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_TID)) {
+            streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_STREAMID)) {
+            streamId = attributes.get(INLONGMSG_ATTR_STREAMID);
+        } else {
+            throw new IllegalArgumentException(
+                    "Could not find " + INLONGMSG_ATTR_INTERFACE_NAME +
+                            " or " + INLONGMSG_ATTR_INTERFACE_ID +
+                            " or " + INLONGMSG_ATTR_INTERFACE_TID +
+                            " or " + INLONGMSG_ATTR_STREAMID + " in 
attributes!");
+        }
+
+        // Extracts time from the attributes
+        Timestamp time;
+
+        if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+            String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+            time = parseDateTime(date);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+            String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+            time = parseEpochTime(epoch);
+        } else {
+            throw new IllegalArgumentException(
+                    "Could not find " + INLONGMSG_ATTR_TIME_T +
+                            " or " + INLONGMSG_ATTR_TIME_DT + " in 
attributes!");
+        }
+
+        // Extracts predefined fields from the attributes
+        List<String> predefinedFields = getPredefinedFields(attributes);
+
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
+    }
+
+    public static List<InLongMsgBody> parseBodyList(
+            byte[] bytes,
+            String charset,
+            char delimiter,
+            Character lineDelimiter,
+            Character escapeChar,
+            Character quoteChar,
+            boolean deleteHeadDelimiter) {
+        String bodyStr = new String(bytes, Charset.forName(charset));
+
+        String[][] split =
+                splitCsv(bodyStr, delimiter, escapeChar, quoteChar, 
lineDelimiter, deleteHeadDelimiter);
+
+        return Arrays.stream(split)
+                .map((line) -> {
+                    // Only parsed fields will be used by downstream, so it's 
safe to leave
+                    // the other parameters empty.
+                    return new InLongMsgBody(
+                            null,
+                            null,
+                            Arrays.asList(line),
+                            Collections.emptyMap());
+                }).collect(Collectors.toList());
+    }
+
+    public static GenericRowData deserializeRowData(
+            RowFormatInfo rowFormatInfo,
+            String nullLiteral,
+            List<String> predefinedFields,
+            List<String> fields,
+            FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+        int actualNumFields = predefinedFields.size() + fields.size();
+        if (actualNumFields != fieldNames.length) {
+            LOG.warn("The number of fields mismatches: expected={}, actual={}. 
" +
+                    "PredefinedFields=[{}], Fields=[{}]", fieldNames.length, 
actualNumFields,
+                    predefinedFields, fields);
+        }
+
+        GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+        // Deserialize pre-defined fields
+        for (int i = 0; i < predefinedFields.size(); ++i) {
+            if (i >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            FieldToRowDataConverter converter = converters[i];
+            String fieldText = predefinedFields.get(i);
+
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral));
+            rowData.setField(i, field);
+        }
+
+        // Deserialize fields
+        for (int i = 0; i < fields.size(); ++i) {
+
+            if (i + predefinedFields.size() >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i + predefinedFields.size()];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i + 
predefinedFields.size()];
+            FieldToRowDataConverter converter = converters[i + 
predefinedFields.size()];
+            String fieldText = fields.get(i);
+
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral));
+            rowData.setField(i + predefinedFields.size(), field);
+        }
+
+        // If schema length is larger than fields' length, use `null` to fill 
in the blanks
+        for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
+            rowData.setField(i, null);
+        }
+
+        return rowData;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..52a23d9ac8
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvFormatFactory
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
new file mode 100644
index 0000000000..47e3fe0595
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.STREAMID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link InLongMsgCsvFormatDeserializer}.
+ */
+public class InLongMsgCsvFormatDeserializerTest {
+
+    private final FieldToRowDataConverter mapConvert =
+            FieldToRowDataConverters.createConverter(MAP(STRING(), 
STRING()).getLogicalType());
+
+    private static final RowFormatInfo TEST_ROW_INFO =
+            new RowFormatInfo(
+                    new String[]{"f1", "f2", "f3", "f4", "f5", "f6"},
+                    new FormatInfo[]{
+                            IntFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE
+                    });
+
+    private static final RowFormatInfo TEST_NO_PREDEFINE_ROW_INFO =
+            new RowFormatInfo(
+                    new String[]{"inlongmsg_time", "inlongmsg_attributes", 
"f1", "f2", "f3", "f4", "f5", "f6"},
+                    new FormatInfo[]{
+                            new TimestampFormatInfo(),
+                            new MapFormatInfo(StringFormatInfo.INSTANCE, 
StringFormatInfo.INSTANCE),
+                            IntFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE
+                    });
+
+    @Test
+    public void testRowType() {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        
.setMetadataKeys(Collections.singletonList(STREAMID.getKey()))
+                        .build();
+
+        String[] expectedFieldNames = new String[]{
+                "inlongmsg_time",
+                "inlongmsg_attributes",
+                "f1",
+                "f2",
+                "f3",
+                "f4",
+                "f5",
+                "f6",
+                "metadata-streamId"
+        };
+
+        LogicalType[] expectedFieldTypes = new LogicalType[]{
+                new TimestampType(),
+                new MapType(new VarCharType(), new VarCharType()),
+                new IntType(),
+                new IntType(),
+                new IntType(),
+                new VarCharType(),
+                new VarCharType(),
+                new VarCharType(),
+                new VarCharType()
+        };
+        RowType expectedRowType = RowType.of(expectedFieldTypes, 
expectedFieldNames);
+        assertEquals(InternalTypeInfo.of(expectedRowType), 
deserializer.getProducedType());
+    }
+
+    @Test
+    public void testRowTypeWithoutHeadFields() {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .build();
+
+        String[] fieldNames = new String[]{
+                "f1",
+                "f2",
+                "f3",
+                "f4",
+                "f5",
+                "f6"
+        };
+
+        LogicalType[] fieldTypes = new LogicalType[]{
+                new IntType(),
+                new IntType(),
+                new IntType(),
+                new VarCharType(),
+                new VarCharType(),
+                new VarCharType()
+        };
+        RowType rowType = RowType.of(fieldTypes, fieldNames);
+        assertEquals(InternalTypeInfo.of(rowType), 
deserializer.getProducedType());
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body1 = "123,field11,field12,field13";
+        String body2 = "123,field21,field22,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString("field13"));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString("field22"));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testInlongMsg() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testStreamId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body1 = "123,field11,field12,field13";
+        String body2 = "123,field21,field22,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testStreamId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectedRow1 = new GenericRowData(8);
+        expectedRow1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectedRow1.setField(1, mapConvert.convert(expectedAttributes));
+        expectedRow1.setField(2, 1);
+        expectedRow1.setField(3, 2);
+        expectedRow1.setField(4, 123);
+        expectedRow1.setField(5, StringData.fromString("field11"));
+        expectedRow1.setField(6, StringData.fromString("field12"));
+        expectedRow1.setField(7, StringData.fromString("field13"));
+
+        GenericRowData expectedRow2 = new GenericRowData(8);
+        expectedRow2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectedRow2.setField(1, mapConvert.convert(expectedAttributes));
+        expectedRow2.setField(2, 1);
+        expectedRow2.setField(3, 2);
+        expectedRow2.setField(4, 123);
+        expectedRow2.setField(5, StringData.fromString("field21"));
+        expectedRow2.setField(6, StringData.fromString("field22"));
+        expectedRow2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectedRow1, expectedRow2));
+    }
+
+    @Test
+    public void testExceptionHandler() throws Exception {
+        TestFailureHandler errorHandler = new TestFailureHandler();
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(
+                        TEST_ROW_INFO,
+                        "inlongmsg_time",
+                        "inlongmsg_attributes",
+                        DEFAULT_CHARSET,
+                        DEFAULT_DELIMITER,
+                        null,
+                        null,
+                        null,
+                        null,
+                        InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
+                        Collections.emptyList(),
+                        errorHandler);
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body1 = "test,field11,field12,field13";
+        String body2 = "123,field21,field22,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        List<RowData> actualRows = new ArrayList<>();
+        Collector<RowData> collector = new ListCollector<>(actualRows);
+        deserializer.flatMap(inLongMsg.buildArray(), collector);
+        assertEquals(1, errorHandler.getRowCount());
+
+        InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
+        String abNormalAttrs = 
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
+        inLongMsg1Head.addMsg(abNormalAttrs, body1.getBytes());
+        inLongMsg1Head.addMsg(abNormalAttrs, body2.getBytes());
+        deserializer.flatMap(inLongMsg1Head.buildArray(), collector);
+        assertEquals(1, errorHandler.getHeadCount());
+    }
+
+    @Test
+    public void testEmptyField() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body1 = "123,field11,field12,";
+        String body2 = "123,field21,,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString(""));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString(""));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testNoPredefinedFields() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&streamId=testInterfaceId&t=20200322";
+        String body1 = "1,2,123,field11,field12,";
+        String body2 = "1,2,123,field21,,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString(""));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString(""));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testPredefinedFieldWithFlagOn() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .setRetainPredefinedField(true)
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&streamId=testInterfaceId&t=20200322" +
+                "&__addcol1__repdate=20220224&__addcol2__hour=1517";
+        String body1 = "123,field11,field12,";
+        String body2 = "123,field21,,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__repdate", "20220224");
+        expectedAttributes.put("__addcol2__hour", "1517");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 20220224);
+        expectRowData1.setField(3, 1517);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString(""));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 20220224);
+        expectRowData2.setField(3, 1517);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString(""));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testPredefinedFieldWithFlagOff() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .setRetainPredefinedField(false)
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&streamId=testInterfaceId&t=20200322" +
+                "&__addcol1__repdate=20220224&__addcol2__hour=1517";
+        String body1 = "1,2,123,field11,field12,";
+        String body2 = "1,2,123,field21,,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__repdate", "20220224");
+        expectedAttributes.put("__addcol2__hour", "1517");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString(""));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString(""));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testIgnoreAttributeErrors() throws Exception {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(
+                        TEST_ROW_INFO,
+                        DEFAULT_TIME_FIELD_NAME,
+                        DEFAULT_ATTRIBUTES_FIELD_NAME,
+                        Charset.defaultCharset().name(),
+                        DEFAULT_DELIMITER,
+                        null,
+                        null,
+                        null,
+                        null,
+                        true,
+                        Collections.emptyList(),
+                        true);
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&&&&";
+        String body1 = "123,field11,field12,field13";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Collections.emptyList());
+    }
+
+    @Test
+    public void testIgnoreBodyErrors() throws Exception {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .setIgnoreErrors(true)
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body1 = "aaa,field11,field12,field13";
+        String body2 = "123,field21,field22,field23";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectRowData = new GenericRowData(8);
+        expectRowData.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData.setField(2, 1);
+        expectRowData.setField(3, 2);
+        expectRowData.setField(4, 123);
+        expectRowData.setField(5, StringData.fromString("field21"));
+        expectRowData.setField(6, StringData.fromString("field22"));
+        expectRowData.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Collections.singletonList(expectRowData));
+    }
+
+    @Test
+    public void testDeleteHeadDelimiter() throws Exception {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .setDeleteHeadDelimiter(true)
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&streamId=testInterfaceId&t=20200322";
+        String body = ",1,2,3,field1,field2,field3";
+
+        inLongMsg.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+
+        GenericRowData expectRowData = new GenericRowData(8);
+        expectRowData.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData.setField(2, 1);
+        expectRowData.setField(3, 2);
+        expectRowData.setField(4, 3);
+        expectRowData.setField(5, StringData.fromString("field1"));
+        expectRowData.setField(6, StringData.fromString("field2"));
+        expectRowData.setField(7, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Collections.singletonList(expectRowData));
+    }
+
+    @Test
+    public void testRetainHeadDelimiter() throws Exception {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .setDeleteHeadDelimiter(false)
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&streamId=testInterfaceId&t=20200322";
+        String body = ",1,2,field1,field2,field3";
+
+        inLongMsg.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+
+        GenericRowData expectRowData = new GenericRowData(8);
+        expectRowData.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData.setField(2, null);
+        expectRowData.setField(3, 1);
+        expectRowData.setField(4, 2);
+        expectRowData.setField(5, StringData.fromString("field1"));
+        expectRowData.setField(6, StringData.fromString("field2"));
+        expectRowData.setField(7, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Collections.singletonList(expectRowData));
+    }
+
+    @Test
+    public void testUnmatchedFields1() throws Exception {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body1 = "123,field11,field12";
+        String body2 = "123,field21,field22,field23,field24";
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, null);
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString("field22"));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testUnmatchedFields2() throws Exception {
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&" +
+                "__addcol2__=2&__addcol3__=3&__addcol4__=4&__addcol5__=5&" +
+                "__addcol6__=6&__addcol7__=7";
+        String body = "field11,field12";
+        inLongMsg.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+        expectedAttributes.put("__addcol3__", "3");
+        expectedAttributes.put("__addcol4__", "4");
+        expectedAttributes.put("__addcol5__", "5");
+        expectedAttributes.put("__addcol6__", "6");
+        expectedAttributes.put("__addcol7__", "7");
+
+        GenericRowData expectRowData = new GenericRowData(8);
+        expectRowData.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData.setField(2, 1);
+        expectRowData.setField(3, 2);
+        expectRowData.setField(4, 3);
+        expectRowData.setField(5, StringData.fromString("4"));
+        expectRowData.setField(6, StringData.fromString("5"));
+        expectRowData.setField(7, StringData.fromString("6"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Collections.singletonList(expectRowData));
+    }
+
+    @Test
+    public void testLineDelimiter() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .setLineDelimiter('\n')
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body = 
"123,field11,field12,field13\n123,field21,field22,field23";
+        inLongMsg.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, 123);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString("field13"));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString("field22"));
+        expectRowData2.setField(7, StringData.fromString("field23"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2));
+    }
+
+    @Test
+    public void testMetadata() throws Exception {
+
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        
.setMetadataKeys(Collections.singletonList(STREAMID.getKey()))
+                        .build();
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = 
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        String body = "123,field11,field12,field13";
+        inLongMsg.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("streamId", "testInterfaceId");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1__", "1");
+        expectedAttributes.put("__addcol2__", "2");
+
+        GenericRowData expectRowData = new GenericRowData(9);
+        expectRowData.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData.setField(2, 1);
+        expectRowData.setField(3, 2);
+        expectRowData.setField(4, 123);
+        expectRowData.setField(5, StringData.fromString("field11"));
+        expectRowData.setField(6, StringData.fromString("field12"));
+        expectRowData.setField(7, StringData.fromString("field13"));
+        expectRowData.setField(8, StringData.fromString("testInterfaceId"));
+
+        testRowDeserialization(
+                deserializer,
+                inLongMsg.buildArray(),
+                Collections.singletonList(expectRowData));
+    }
+
+    private void testRowDeserialization(
+            InLongMsgCsvFormatDeserializer deserializer,
+            byte[] bytes,
+            List<RowData> expectedRows) throws Exception {
+
+        List<RowData> actualRows = new ArrayList<>();
+        Collector<RowData> collector = new ListCollector<>(actualRows);
+
+        deserializer.flatMap(bytes, collector);
+
+        assertEquals(expectedRows, actualRows);
+    }
+
+    private static class TestFailureHandler implements FailureHandler {
+
+        private int headCount = 0;
+        private int bodyCount = 0;
+        private int rowCount = 0;
+
+        public int getHeadCount() {
+            return headCount;
+        }
+
+        public int getBodyCount() {
+            return bodyCount;
+        }
+
+        public int getRowCount() {
+            return rowCount;
+        }
+
+        @Override
+        public void onParsingHeadFailure(String attribute, Exception 
exception) throws Exception {
+            headCount++;
+        }
+
+        @Override
+        public void onParsingBodyFailure(byte[] body, Exception exception) 
throws Exception {
+            bodyCount++;
+        }
+
+        @Override
+        public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody 
body,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
new file mode 100644
index 0000000000..bc524678a3
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.FormatUtils;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link InLongMsgCsvFormatFactory}.
+ */
+public class InLongMsgCsvFormatFactoryTest {
+
+    public RowFormatInfo testFormatInfo;
+
+    public TypeInformation<RowData> testTypeInformation;
+
+    public ResolvedSchema resolvedSchema;
+
+    public DataType dataType;
+
+    @Before
+    public void setup() {
+        resolvedSchema =
+                ResolvedSchema.of(
+                        Column.physical("time", DataTypes.TIMESTAMP()),
+                        Column.physical("attributes", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                        Column.physical("student_name", DataTypes.STRING()),
+                        Column.physical("score", DataTypes.INT()),
+                        Column.physical("date", DataTypes.DATE()));
+        dataType = resolvedSchema.toPhysicalRowDataType();
+        RowType rowType = (RowType) dataType.getLogicalType();
+        testTypeInformation = InternalTypeInfo.of(rowType);
+        testFormatInfo = new RowFormatInfo(
+                new String[]{"time", "attributes", "student_name", "score", 
"date"},
+                new FormatInfo[]{
+                        new TimestampFormatInfo(),
+                        new MapFormatInfo(StringFormatInfo.INSTANCE, 
StringFormatInfo.INSTANCE),
+                        StringFormatInfo.INSTANCE,
+                        IntFormatInfo.INSTANCE,
+                        new DateFormatInfo("yyyy-MM-dd")
+                });
+    }
+
+    @Test
+    public void testDeSchema() {
+        final InLongMsgCsvRowDataDeserializationSchema expectedDeSer =
+                new InLongMsgCsvRowDataDeserializationSchema.Builder(
+                        testFormatInfo)
+                                .setCharset("UTF-8")
+                                .setFieldDelimiter(';')
+                                .setQuoteCharacter('\'')
+                                .setEscapeCharacter('\\')
+                                .setNullLiteral("n/a")
+                                .build();
+        final Map<String, String> options = getAllOptions();
+        DeserializationSchema<RowData> actualDeser = 
createDeserializationSchema(options);
+        assertEquals(expectedDeSer, actualDeser);
+    }
+
+    private DeserializationSchema<RowData> createDeserializationSchema(
+            Map<String, String> options) {
+        final DynamicTableSource actualSource = 
createTableSource(resolvedSchema, options);
+        assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;
+        TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
+                (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+        return sourceMock.valueFormat.createRuntimeDecoder(
+                ScanRuntimeProviderContext.INSTANCE, dataType);
+    }
+
+    private Map<String, String> getAllOptions() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+        options.put("target", "MyTarget");
+        options.put("buffer-size", "1000");
+
+        options.put("format", InLongMsgCsvFormatFactory.IDENTIFIER);
+        options.put("inlong-msg-csv.row.format.info", 
FormatUtils.marshall(testFormatInfo));
+        options.put("inlong-msg-csv.format.field-delimiter", ";");
+        options.put("inlong-msg-csv.format.quote-character", "'");
+        options.put("inlong-msg-csv.format.escape-character", "\\");
+        options.put("inlong-msg-csv.format.null-literal", "n/a");
+        return options;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/resources/log4j-test.properties
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..e07d62aefe
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=[%t] %-5p %l %x - %m%n
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
index d3f477164b..25b69037a0 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
@@ -96,52 +96,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-    <profiles>
-        <profile>
-            <id>japicmp-report</id>
-            <activation>
-                <property>
-                    <name>japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                
<breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications>
-                                
<breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
-            <id>japicmp-check</id>
-            <activation>
-                <property>
-                    <name>!japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                <excludes>
-                                    
<exclude>org.apache.inlong.sort.flink.formats.csv.CsvRowDataDeserializationSchema</exclude>
-                                    
<exclude>org.apache.inlong.sort.flink.formats.csv.CsvRowDataSerializationSchema</exclude>
-                                </excludes>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml
index 4be8115d88..7f7e854faf 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml
@@ -114,52 +114,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-    <profiles>
-        <profile>
-            <id>japicmp-report</id>
-            <activation>
-                <property>
-                    <name>japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                
<breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications>
-                                
<breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
-            <id>japicmp-check</id>
-            <activation>
-                <property>
-                    <name>!japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                <excludes>
-                                    
<exclude>org.apache.inlong.sort.formats.json.JsonRowDataDeserializationSchema</exclude>
-                                    
<exclude>org.apache.inlong.sort.formats.json.JsonRowDataSerializationSchema</exclude>
-                                </excludes>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
index 8156a8b18f..859a8b8816 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
@@ -91,52 +91,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-    <profiles>
-        <profile>
-            <id>japicmp-report</id>
-            <activation>
-                <property>
-                    <name>japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                
<breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications>
-                                
<breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
-            <id>japicmp-check</id>
-            <activation>
-                <property>
-                    <name>!japicmp-report</name>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>com.github.siom79.japicmp</groupId>
-                        <artifactId>japicmp-maven-plugin</artifactId>
-                        <configuration>
-                            <parameter>
-                                <excludes>
-                                    
<exclude>org.apache.inlong.sort.formats.kv.KvRowDataDeserializationSchema</exclude>
-                                    
<exclude>org.apache.inlong.sort.formats.kv.KvRowDataSerializationSchema</exclude>
-                                </excludes>
-                            </parameter>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/pom.xml
index 5f5811b4bb..27528c0dc2 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -39,6 +39,7 @@
         <module>format-inlongmsg-rowdata-base</module>
         <module>format-inlongmsg-rowdata-binlog</module>
         <module>format-inlongmsg-rowdata-pb</module>
+        <module>format-inlongmsg-rowdata-csv</module>
     </modules>
 
     <properties>

Reply via email to