This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bd93501b52 [INLONG-9597][Sort] Support row way of sort InLong message 
tlog-csv format (#9602)
bd93501b52 is described below

commit bd93501b5229aff2542bab78f13b083dcc52353e
Author: baomingyu <[email protected]>
AuthorDate: Mon Jan 29 14:21:57 2024 +0800

    [INLONG-9597][Sort] Support row way of sort InLong message tlog-csv format 
(#9602)
---
 .../format-row/format-inlongmsg-tlogcsv/pom.xml    | 130 ++++++++++
 .../formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java |  72 ++++++
 .../InLongMsgTlogCsvFormatDeserializer.java        | 264 +++++++++++++++++++
 .../InLongMsgTlogCsvFormatFactory.java             | 158 ++++++++++++
 .../InLongMsgTlogCsvMixedFormatConverter.java      | 163 ++++++++++++
 .../InLongMsgTlogCsvMixedFormatDeserializer.java   | 174 +++++++++++++
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    | 170 ++++++++++++
 .../InLongMsgTlogCsvValidator.java                 |  40 +++
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../InLongMsgTlogCsvFormatDeserializerTest.java    | 285 +++++++++++++++++++++
 .../InLongMsgTlogCsvFormatFactoryTest.java         | 121 +++++++++
 .../inlongmsgtlogcsv/InLongMsgTlogCsvTest.java     | 108 ++++++++
 .../src/test/resources/log4j-test.properties       |  27 ++
 inlong-sort/sort-formats/format-row/pom.xml        |   1 +
 14 files changed, 1729 insertions(+)

diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/pom.xml 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/pom.xml
new file mode 100644
index 0000000000..d32673c7a0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>format-row</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-format-inlongmsg-tlogcsv</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort Format-InLongMsg-TLogCSV</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+
+        <!-- core dependencies -->
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-inlongmsg-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+            <version>${flink.jackson.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <profiles>
+        <!-- Create SQL Client uber jars by default -->
+        <profile>
+            <id>sql-jars</id>
+            <activation>
+                <property>
+                    <name>!skipSqlJars</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>jar</goal>
+                                </goals>
+                                <phase>package</phase>
+                                <configuration>
+                                    <classifier>sql-jar</classifier>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
new file mode 100644
index 0000000000..168ba7b5ec
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+
+/**
+ * Format descriptor for comma-separated values (CSV).
+ *
+ * <p>This descriptor aims to comply with RFC-4180 ("Common Format and MIME 
Type
+ * for Comma-Separated Values (CSV) Files") proposed by the Internet 
Engineering
+ * Task Force (IETF).
+ */
+public class InLongMsgTlogCsv extends TextFormatDescriptor<InLongMsgTlogCsv> {
+
+    public static final String FORMAT_TYPE_VALUE = "inlong-msg-tlogcsv";
+
+    public InLongMsgTlogCsv() {
+        super(FORMAT_TYPE_VALUE, 1);
+    }
+
+    /**
+     * Sets the delimiter character (',' by default).
+     *
+     * @param delimiter the field delimiter character
+     */
+    public InLongMsgTlogCsv delimiter(char delimiter) {
+        internalProperties.putCharacter(FORMAT_DELIMITER, delimiter);
+        return this;
+    }
+
+    /**
+     * Sets the name of the time field.
+     *
+     * @param timeFieldName The name of the time field.
+     */
+    public InLongMsgTlogCsv timeFieldName(String timeFieldName) {
+        checkNotNull(timeFieldName);
+        internalProperties.putString(InLongMsgUtils.FORMAT_TIME_FIELD_NAME, 
timeFieldName);
+        return this;
+    }
+
+    /**
+     * Sets the name of the attributes field.
+     *
+     * @param attributesFieldName The name of the attributes field.
+     */
+    public InLongMsgTlogCsv attributesFieldName(String attributesFieldName) {
+        checkNotNull(attributesFieldName);
+        
internalProperties.putString(InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME, 
attributesFieldName);
+        return this;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
new file mode 100644
index 0000000000..191df7fd16
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.base.TextFormatBuilder;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+
+/**
+ * The deserializer for the records in InLongMsgTlogCsv format.
+ */
+public final class InLongMsgTlogCsvFormatDeserializer extends 
AbstractInLongMsgFormatDeserializer {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Format information describing the result type.
+     */
+    @Nonnull
+    private final RowFormatInfo rowFormatInfo;
+
+    /**
+     * The name of the time field.
+     */
+    @Nullable
+    private final String timeFieldName;
+
+    /**
+     * The name of the attributes field.
+     */
+    @Nullable
+    private final String attributesFieldName;
+
+    /**
+     * The charset of the text.
+     */
+    private final String charset;
+
+    /**
+     * The delimiter between fields.
+     */
+    @Nonnull
+    private final Character delimiter;
+
+    /**
+     * Escape character. Null if escaping is disabled.
+     */
+    @Nullable
+    private final Character escapeChar;
+
+    /**
+     * Quote character. Null if quoting is disabled.
+     */
+    @Nullable
+    private final Character quoteChar;
+
+    /**
+     * The literal represented null values, default "".
+     */
+    @Nullable
+    private final String nullLiteral;
+
+    public InLongMsgTlogCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            @Nonnull Boolean ignoreErrors) {
+        this(
+                rowFormatInfo,
+                timeFieldName,
+                attributesFieldName,
+                charset,
+                delimiter,
+                escapeChar,
+                quoteChar,
+                nullLiteral,
+                InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+    }
+
+    public InLongMsgTlogCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            @Nonnull FailureHandler failureHandler) {
+        super(failureHandler);
+
+        this.rowFormatInfo = rowFormatInfo;
+        this.timeFieldName = timeFieldName;
+        this.attributesFieldName = attributesFieldName;
+        this.charset = charset;
+        this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+        this.nullLiteral = nullLiteral;
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return 
InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName, 
attributesFieldName, rowFormatInfo);
+    }
+
+    @Override
+    protected InLongMsgHead parseHead(String attr) throws Exception {
+        return InLongMsgTlogCsvUtils.parseHead(attr);
+    }
+
+    @Override
+    protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception 
{
+        return Collections.singletonList(
+                InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, 
escapeChar, quoteChar));
+    }
+
+    @Override
+    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
+        Row dataRow =
+                InLongMsgTlogCsvUtils.deserializeRow(
+                        rowFormatInfo,
+                        nullLiteral,
+                        head.getPredefinedFields(),
+                        body.getFields());
+
+        Row row = InLongMsgUtils.decorateRowWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                head.getTime(),
+                head.getAttributes(),
+                dataRow);
+        return Collections.singletonList(row);
+    }
+
+    /**
+     * The builder for {@link InLongMsgTlogCsvFormatDeserializer}.
+     */
+    public static class Builder extends TextFormatBuilder<Builder> {
+
+        private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+        private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+        private Character delimiter = DEFAULT_DELIMITER;
+
+        public Builder(RowFormatInfo rowFormatInfo) {
+            super(rowFormatInfo);
+        }
+
+        public Builder setTimeFieldName(String timeFieldName) {
+            this.timeFieldName = timeFieldName;
+            return this;
+        }
+
+        public Builder setAttributesFieldName(String attributesFieldName) {
+            this.attributesFieldName = attributesFieldName;
+            return this;
+        }
+
+        public Builder setDelimiter(Character delimiter) {
+            this.delimiter = delimiter;
+            return this;
+        }
+
+        public Builder configure(DescriptorProperties descriptorProperties) {
+            super.configure(descriptorProperties);
+
+            descriptorProperties.getOptionalString(FORMAT_TIME_FIELD_NAME)
+                    .ifPresent(this::setTimeFieldName);
+            
descriptorProperties.getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME)
+                    .ifPresent(this::setAttributesFieldName);
+            descriptorProperties.getOptionalCharacter(FORMAT_DELIMITER)
+                    .ifPresent(this::setDelimiter);
+
+            return this;
+        }
+
+        public InLongMsgTlogCsvFormatDeserializer build() {
+            return new InLongMsgTlogCsvFormatDeserializer(
+                    rowFormatInfo,
+                    timeFieldName,
+                    attributesFieldName,
+                    charset,
+                    delimiter,
+                    escapeChar,
+                    quoteChar,
+                    nullLiteral,
+                    ignoreErrors);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        InLongMsgTlogCsvFormatDeserializer that = 
(InLongMsgTlogCsvFormatDeserializer) o;
+        return rowFormatInfo.equals(that.rowFormatInfo) &&
+                Objects.equals(timeFieldName, that.timeFieldName) &&
+                Objects.equals(attributesFieldName, that.attributesFieldName) 
&&
+                Objects.equals(charset, that.charset) &&
+                delimiter.equals(that.delimiter) &&
+                Objects.equals(escapeChar, that.escapeChar) &&
+                Objects.equals(quoteChar, that.quoteChar) &&
+                Objects.equals(nullLiteral, that.nullLiteral);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
+                attributesFieldName, charset, delimiter, escapeChar, quoteChar,
+                nullLiteral);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
new file mode 100644
index 0000000000..bb83ad6558
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
+import 
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
+import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFormatFactoryBase;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+
+/**
+ * Table format factory for providing configured instances of 
InLongMsgTlogCsv-to-row
+ * serializer and deserializer.
+ */
+public final class InLongMsgTlogCsvFormatFactory
+        extends
+            TableFormatFactoryBase<Row>
+        implements
+            TableFormatDeserializerFactory,
+            InLongMsgMixedFormatFactory {
+
+    public InLongMsgTlogCsvFormatFactory() {
+        super(InLongMsgTlogCsv.FORMAT_TYPE_VALUE, 1, false);
+    }
+
+    @Override
+    public List<String> supportedFormatProperties() {
+        final List<String> properties = new ArrayList<>();
+        properties.add(FORMAT_DELIMITER);
+        properties.add(FORMAT_ESCAPE_CHARACTER);
+        properties.add(FORMAT_QUOTE_CHARACTER);
+        properties.add(FORMAT_NULL_LITERAL);
+        properties.add(FORMAT_CHARSET);
+        properties.add(FORMAT_IGNORE_ERRORS);
+        properties.add(FORMAT_SCHEMA);
+        properties.add(FORMAT_TIME_FIELD_NAME);
+        properties.add(FORMAT_ATTRIBUTES_FIELD_NAME);
+
+        return properties;
+    }
+
+    @Override
+    public InLongMsgTlogCsvFormatDeserializer createFormatDeserializer(
+            Map<String, String> properties) {
+        final DescriptorProperties descriptorProperties =
+                new DescriptorProperties(true);
+        descriptorProperties.putProperties(properties);
+
+        InLongMsgTlogCsvValidator validator = new InLongMsgTlogCsvValidator();
+        validator.validate(descriptorProperties);
+
+        RowFormatInfo rowFormatInfo = 
getDataRowFormatInfo(descriptorProperties);
+
+        InLongMsgTlogCsvFormatDeserializer.Builder builder =
+                new InLongMsgTlogCsvFormatDeserializer.Builder(rowFormatInfo);
+        builder.configure(descriptorProperties);
+
+        return builder.build();
+    }
+
+    @Override
+    public TableFormatDeserializer createFormatDeserializer(TableFormatContext 
context) {
+        TableFormatDeserializer deserializer =
+                createFormatDeserializer(context.getFormatProperties());
+        deserializer.init(context);
+        return deserializer;
+    }
+
+    @Override
+    public InLongMsgTlogCsvMixedFormatDeserializer 
createMixedFormatDeserializer(
+            Map<String, String> properties) {
+        final DescriptorProperties descriptorProperties =
+                new DescriptorProperties(true);
+        descriptorProperties.putProperties(properties);
+
+        InLongMsgMixedFormatDeserializerValidator validator =
+                new InLongMsgMixedFormatDeserializerValidator();
+        validator.validate(descriptorProperties);
+
+        InLongMsgTlogCsvMixedFormatDeserializer.Builder builder =
+                new InLongMsgTlogCsvMixedFormatDeserializer.Builder();
+        builder.configure(descriptorProperties);
+
+        return builder.build();
+    }
+
+    @Override
+    public AbstractInLongMsgMixedFormatConverter createMixedFormatConverter(
+            AbstractInLongMsgMixedFormatConverter.TableFormatContext context) {
+        return createMixedFormatConverter(context.getFormatProperties());
+    }
+
+    @Override
+    public AbstractInLongMsgMixedFormatDeserializer 
createMixedFormatDeserializer(
+            TableFormatContext context) {
+        InLongMsgTlogCsvMixedFormatDeserializer deserializer =
+                createMixedFormatDeserializer(context.getFormatProperties());
+        deserializer.init(context);
+        return deserializer;
+    }
+
+    @Override
+    public InLongMsgTlogCsvMixedFormatConverter createMixedFormatConverter(
+            Map<String, String> properties) {
+        final DescriptorProperties descriptorProperties =
+                new DescriptorProperties(true);
+        descriptorProperties.putProperties(properties);
+
+        InLongMsgMixedFormatConverterValidator validator =
+                new InLongMsgMixedFormatConverterValidator();
+        validator.validate(descriptorProperties);
+
+        RowFormatInfo rowFormatInfo = 
getDataRowFormatInfo(descriptorProperties);
+        InLongMsgTlogCsvMixedFormatConverter.Builder builder =
+                new 
InLongMsgTlogCsvMixedFormatConverter.Builder(rowFormatInfo);
+        builder.configure(descriptorProperties);
+
+        return builder.build();
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
new file mode 100644
index 0000000000..da210ed2ce
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Converter used to deserialize a mixed row in InLongMsg-Tlog-CSV format.
+ */
+public class InLongMsgTlogCsvMixedFormatConverter extends 
AbstractInLongMsgMixedFormatConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(
+            InLongMsgTlogCsvMixedFormatConverter.class);
+
+    /**
+     * The schema of the rows.
+     */
+    @Nonnull
+    private final RowFormatInfo rowFormatInfo;
+
+    /**
+     * The name of the time field.
+     */
+    @Nullable
+    private final String timeFieldName;
+
+    /**
+     * The name of the attributes field.
+     */
+    @Nullable
+    private final String attributesFieldName;
+
+    /**
+     * The literal representing null values.
+     */
+    @Nullable
+    private final String nullLiteral;
+
+    public InLongMsgTlogCsvMixedFormatConverter(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nullable String nullLiteral,
+            boolean ignoreErrors) {
+        super(ignoreErrors);
+
+        this.rowFormatInfo = rowFormatInfo;
+        this.timeFieldName = timeFieldName;
+        this.attributesFieldName = attributesFieldName;
+        this.nullLiteral = nullLiteral;
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return 
InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName, 
attributesFieldName, rowFormatInfo);
+    }
+
+    @Override
+    public List<Row> convertRows(
+            Map<String, String> attributes,
+            byte[] data,
+            String tid,
+            Timestamp time,
+            List<String> predefinedFields,
+            List<String> fields,
+            Map<String, String> entries) throws Exception {
+        Row dataRow =
+                InLongMsgTlogCsvUtils.deserializeRow(
+                        rowFormatInfo,
+                        nullLiteral,
+                        predefinedFields,
+                        fields);
+
+        Row row = InLongMsgUtils.decorateRowWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                time,
+                attributes,
+                dataRow);
+
+        return Collections.singletonList(row);
+    }
+
+    /**
+     * The builder for {@link InLongMsgTlogCsvMixedFormatConverter}.
+     */
+    public static class Builder extends InLongMsgMixedFormatConverterBuilder {
+
+        public Builder(RowFormatInfo rowFormatInfo) {
+            super(rowFormatInfo);
+        }
+
+        public InLongMsgTlogCsvMixedFormatConverter build() {
+            return new InLongMsgTlogCsvMixedFormatConverter(
+                    rowFormatInfo,
+                    timeFieldName,
+                    attributesFieldName,
+                    nullLiteral,
+                    ignoreErrors);
+        }
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (this == object) {
+            return true;
+        }
+
+        if (object == null || getClass() != object.getClass()) {
+            return false;
+        }
+
+        if (!super.equals(object)) {
+            return false;
+        }
+
+        InLongMsgTlogCsvMixedFormatConverter that = 
(InLongMsgTlogCsvMixedFormatConverter) object;
+        return rowFormatInfo.equals(that.rowFormatInfo) &&
+                Objects.equals(timeFieldName, that.timeFieldName) &&
+                Objects.equals(attributesFieldName, that.attributesFieldName) 
&&
+                Objects.equals(nullLiteral, that.nullLiteral);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowFormatInfo, timeFieldName, attributesFieldName,
+                nullLiteral);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
new file mode 100644
index 0000000000..ca0f0b9f86
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+
+/**
+ * The deserializer for the records in InLongMsgTlogCsv format.
+ */
+public final class InLongMsgTlogCsvMixedFormatDeserializer
+        extends
+            AbstractInLongMsgMixedFormatDeserializer {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * The charset of the text.
+     */
+    private final String charset;
+
+    /**
+     * The delimiter between fields.
+     */
+    @Nonnull
+    private final Character delimiter;
+
+    /**
+     * Escape character. Null if escaping is disabled.
+     */
+    @Nullable
+    private final Character escapeChar;
+
+    /**
+     * Quote character. Null if quoting is disabled.
+     */
+    @Nullable
+    private final Character quoteChar;
+
+    public InLongMsgTlogCsvMixedFormatDeserializer(
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nonnull Boolean ignoreErrors) {
+        this(charset, delimiter, escapeChar, quoteChar, 
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+    }
+
+    public InLongMsgTlogCsvMixedFormatDeserializer(
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nonnull FailureHandler failureHandler) {
+        super(failureHandler);
+
+        this.delimiter = delimiter;
+        this.charset = charset;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return InLongMsgUtils.MIXED_ROW_TYPE;
+    }
+
+    @Override
+    protected InLongMsgHead parseHead(String attr) throws Exception {
+        return InLongMsgTlogCsvUtils.parseHead(attr);
+    }
+
+    @Override
+    protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception 
{
+        return Collections.singletonList(
+                InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, 
escapeChar, quoteChar));
+    }
+
+    @Override
+    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
+        Row row = InLongMsgUtils.buildMixedRow(head, body, body.getTid());
+        return Collections.singletonList(row);
+    }
+
+    /**
+     * The builder for {@link InLongMsgTlogCsvMixedFormatDeserializer}.
+     */
+    public static class Builder extends 
InLongMsgTextMixedFormatDeserializerBuilder<Builder> {
+
+        private Character delimiter;
+
+        public Builder setDelimiter(char delimiter) {
+            this.delimiter = delimiter;
+            return this;
+        }
+
+        @Override
+        public Builder configure(DescriptorProperties descriptorProperties) {
+            super.configure(descriptorProperties);
+
+            descriptorProperties.getOptionalCharacter(FORMAT_DELIMITER)
+                    .ifPresent(this::setDelimiter);
+
+            return this;
+        }
+
+        public InLongMsgTlogCsvMixedFormatDeserializer build() {
+            return new InLongMsgTlogCsvMixedFormatDeserializer(
+                    charset,
+                    delimiter,
+                    escapeChar,
+                    quoteChar,
+                    ignoreErrors);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        if (!super.equals(o)) {
+            return false;
+        }
+        InLongMsgTlogCsvMixedFormatDeserializer that = 
(InLongMsgTlogCsvMixedFormatDeserializer) o;
+        return charset.equals(that.charset) &&
+                delimiter.equals(that.delimiter) &&
+                Objects.equals(escapeChar, that.escapeChar) &&
+                Objects.equals(quoteChar, that.quoteChar);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), charset, delimiter, escapeChar,
+                quoteChar);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
new file mode 100644
index 0000000000..8a5ef167a2
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
+
+/**
+ * Utilities for {@link InLongMsgTlogCsv}.
+ */
+public class InLongMsgTlogCsvUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InLongMsgTlogCsvUtils.class);
+
+    public static InLongMsgHead parseHead(String attr) {
+        Map<String, String> attributes = parseAttr(attr);
+
+        // Extracts time from the attributes
+        Timestamp time;
+
+        if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+            String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+            time = parseDateTime(date);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+            String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+            time = parseEpochTime(epoch);
+        } else {
+            throw new IllegalArgumentException(
+                    "Could not find " + INLONGMSG_ATTR_TIME_T +
+                            " or " + INLONGMSG_ATTR_TIME_DT + " in 
attributes!");
+        }
+
+        // Extracts predefined fields from the attributes
+        List<String> predefinedFields = getPredefinedFields(attributes);
+
+        return new InLongMsgHead(attributes, null, time, predefinedFields);
+    }
+
+    public static InLongMsgBody parseBody(
+            byte[] bytes,
+            String charset,
+            char delimiter,
+            Character escapeChar,
+            Character quoteChar) {
+        String text;
+        if (bytes[0] == delimiter) {
+            text = new String(bytes, 1, bytes.length - 1, 
Charset.forName(charset));
+        } else {
+            text = new String(bytes, Charset.forName(charset));
+        }
+
+        String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
+
+        String tid = segments[0];
+        List<String> fields =
+                Arrays.stream(segments, 1, 
segments.length).collect(Collectors.toList());
+
+        return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+    }
+
+    /**
+     * Deserializes the given fields into the row.
+     *
+     * @param rowFormatInfo The format information of the row.
+     * @param nullLiteral The literal for null values.
+     * @param predefinedFields The predefined fields.
+     * @param fields The fields.
+     * @return The row deserialized from the row.
+     */
+    public static Row deserializeRow(
+            RowFormatInfo rowFormatInfo,
+            String nullLiteral,
+            List<String> predefinedFields,
+            List<String> fields) {
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+        int actualNumFields = predefinedFields.size() + fields.size();
+        if (actualNumFields != fieldNames.length) {
+            LOG.warn("The number of fields mismatches: " + fieldNames.length +
+                    " expected, but was " + actualNumFields + ".");
+        }
+
+        Row row = new Row(fieldNames.length);
+
+        for (int i = 0; i < predefinedFields.size(); ++i) {
+
+            if (i >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+            String fieldText = predefinedFields.get(i);
+
+            Object field =
+                    deserializeBasicField(
+                            fieldName,
+                            fieldFormatInfo,
+                            fieldText,
+                            nullLiteral);
+            row.setField(i, field);
+        }
+
+        for (int i = 0; i < fields.size(); ++i) {
+
+            if (i + predefinedFields.size() >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i + predefinedFields.size()];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i + 
predefinedFields.size()];
+
+            String fieldText = fields.get(i);
+
+            Object field =
+                    deserializeBasicField(
+                            fieldName,
+                            fieldFormatInfo,
+                            fieldText,
+                            nullLiteral);
+            row.setField(i + predefinedFields.size(), field);
+        }
+
+        for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
+            row.setField(i, null);
+        }
+
+        return row;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
new file mode 100644
index 0000000000..1e9f297004
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+
+/**
+ * The validator for {@link InLongMsgTlogCsv}.
+ */
+public class InLongMsgTlogCsvValidator extends TextFormatDescriptorValidator {
+
+    @Override
+    public void validate(DescriptorProperties properties) {
+        super.validate(properties);
+
+        properties.validateString(FORMAT_DELIMITER, true, 1, 1);
+
+        validateInLongMsgSchema(properties);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000..a5531edfdb
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgtlogcsv.InLongMsgTlogCsvFormatFactory
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
new file mode 100644
index 0000000000..4060fda847
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link InLongMsgTlogCsvFormatDeserializer}.
+ */
+public class InLongMsgTlogCsvFormatDeserializerTest {
+
+    private static final RowFormatInfo TEST_ROW_INFO =
+            new RowFormatInfo(
+                    new String[]{"f1", "f2", "f3", "f4", "f5"},
+                    new FormatInfo[]{
+                            IntFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE
+                    });
+
+    @Test
+    public void testExceptionHandler() throws Exception {
+        TestFailureHandler errorHandler = new TestFailureHandler();
+        InLongMsgTlogCsvFormatDeserializer deserializer =
+                new InLongMsgTlogCsvFormatDeserializer(
+                        TEST_ROW_INFO,
+                        DEFAULT_TIME_FIELD_NAME,
+                        DEFAULT_ATTRIBUTES_FIELD_NAME,
+                        DEFAULT_CHARSET,
+                        DEFAULT_DELIMITER,
+                        null,
+                        null,
+                        null,
+                        errorHandler);
+
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=test";
+        String body1 = "interfaceId1,field1,field2,field3";
+        String body2 = "interfaceId2,field1,field2,field3";
+        inLongMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg1.addMsg(attrs, body2.getBytes());
+
+        List<Row> actualRows = new ArrayList<>();
+        Collector<Row> collector = new ListCollector<>(actualRows);
+        deserializer.flatMap(inLongMsg1.buildArray(), collector);
+        assertEquals(2, errorHandler.getRowCount());
+
+        InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
+        String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
+        inLongMsg1Head.addMsg(abNormalAttrs, body1.getBytes());
+        inLongMsg1Head.addMsg(abNormalAttrs, body2.getBytes());
+        deserializer.flatMap(inLongMsg1Head.buildArray(), collector);
+        assertEquals(1, errorHandler.getHeadCount());
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2";
+        String body1 = "interfaceId1,field1,field2,field3";
+        String body2 = "interfaceId2,field1,field2,field3";
+        inLongMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg1.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("dt", "1584806400000");
+        expectedAttributes.put("__addcol1_", "1");
+        expectedAttributes.put("__addcol2_", "2");
+
+        Row expectedRow1 = Row.of(
+                new Timestamp(1584806400000L),
+                expectedAttributes,
+                1,
+                2,
+                "field1",
+                "field2",
+                "field3");
+
+        Row expectedRow2 = Row.of(
+                new Timestamp(1584806400000L),
+                expectedAttributes,
+                1,
+                2,
+                "field1",
+                "field2",
+                "field3");
+
+        testRowDeserialization(
+                inLongMsg1.buildArray(),
+                Arrays.asList(expectedRow1, expectedRow2));
+    }
+
+    @Test
+    public void testDeleteHeadDelimiter() throws Exception {
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2";
+        String body = ",interfaceId,field1,field2,field3";
+        inLongMsg1.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("dt", "1584806400000");
+        expectedAttributes.put("__addcol1_", "1");
+        expectedAttributes.put("__addcol2_", "2");
+
+        Row expectedRow = Row.of(
+                new Timestamp(1584806400000L),
+                expectedAttributes,
+                1,
+                2,
+                "field1",
+                "field2",
+                "field3");
+
+        testRowDeserialization(
+                inLongMsg1.buildArray(),
+                Collections.singletonList(expectedRow));
+    }
+
+    @Test
+    public void testUnmatchedFields1() throws Exception {
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2";
+        String body1 = "interfaceId1,field1,field2";
+        String body2 = "interfaceId2,field1,field2,field3,field4";
+        inLongMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg1.addMsg(attrs, body2.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("dt", "1584806400000");
+        expectedAttributes.put("__addcol1_", "1");
+        expectedAttributes.put("__addcol2_", "2");
+
+        Row expectedRow1 = Row.of(
+                new Timestamp(1584806400000L),
+                expectedAttributes,
+                1,
+                2,
+                "field1",
+                "field2",
+                null);
+
+        Row expectedRow2 = Row.of(
+                new Timestamp(1584806400000L),
+                expectedAttributes,
+                1,
+                2,
+                "field1",
+                "field2",
+                "field3");
+
+        testRowDeserialization(
+                inLongMsg1.buildArray(),
+                Arrays.asList(expectedRow1, expectedRow2));
+    }
+
+    @Test
+    public void testUnmatchedFields2() throws Exception {
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2&" +
+                "__addcol3_=3&__addcol4_=4&__addcol5_=5&__addcol6_=6";
+        String body = "interfaceId1,field1,field2,field3";
+        inLongMsg1.addMsg(attrs, body.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("dt", "1584806400000");
+        expectedAttributes.put("__addcol1_", "1");
+        expectedAttributes.put("__addcol2_", "2");
+        expectedAttributes.put("__addcol3_", "3");
+        expectedAttributes.put("__addcol4_", "4");
+        expectedAttributes.put("__addcol5_", "5");
+        expectedAttributes.put("__addcol6_", "6");
+
+        Row expectedRow = Row.of(
+                new Timestamp(1584806400000L),
+                expectedAttributes,
+                1,
+                2,
+                "3",
+                "4",
+                "5");
+
+        testRowDeserialization(
+                inLongMsg1.buildArray(),
+                Collections.singletonList(expectedRow));
+    }
+
+    private void testRowDeserialization(
+            byte[] bytes,
+            List<Row> expectedRows) throws Exception {
+        InLongMsgTlogCsvFormatDeserializer deserializer =
+                new InLongMsgTlogCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        List<Row> actualRows = new ArrayList<>();
+        Collector<Row> collector = new ListCollector<>(actualRows);
+
+        deserializer.flatMap(bytes, collector);
+
+        assertEquals(expectedRows, actualRows);
+    }
+
+    private static class TestFailureHandler implements FailureHandler {
+
+        private int headCount = 0;
+        private int bodyCount = 0;
+        private int rowCount = 0;
+
+        public int getHeadCount() {
+            return headCount;
+        }
+
+        public int getBodyCount() {
+            return bodyCount;
+        }
+
+        public int getRowCount() {
+            return rowCount;
+        }
+
+        @Override
+        public void onParsingHeadFailure(String attribute, Exception 
exception) throws Exception {
+            headCount++;
+        }
+
+        @Override
+        public void onParsingBodyFailure(byte[] body, Exception exception) 
throws Exception {
+            bodyCount++;
+        }
+
+        @Override
+        public void onConvertingRowFailure(InLongMsgHead head,
+                InLongMsgBody body, Exception exception) throws Exception {
+            rowCount++;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
new file mode 100644
index 0000000000..10e2fbcbcd
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
+import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.FormatUtils;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for {@link InLongMsgTlogCsvFormatFactory}.
+ */
+public class InLongMsgTlogCsvFormatFactoryTest {
+
+    private static final TypeInformation<Row> SCHEMA =
+            Types.ROW(
+                    new String[]{"time", "attributes", "student_name", 
"score", "date"},
+                    new TypeInformation[]{
+                            Types.SQL_TIMESTAMP(),
+                            Types.MAP(Types.STRING(), Types.STRING()),
+                            Types.STRING(),
+                            Types.INT(),
+                            Types.SQL_DATE()
+                    });
+
+    private static final RowFormatInfo TEST_FORMAT_SCHEMA =
+            new RowFormatInfo(
+                    new String[]{"student_name", "score", "date"},
+                    new FormatInfo[]{
+                            StringFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            new DateFormatInfo("yyyy-MM-dd")
+                    });
+
+    @Test
+    public void testCreateTableFormatDeserializer() throws Exception {
+        final Map<String, String> properties =
+                new InLongMsgTlogCsv()
+                        .schema(FormatUtils.marshall(TEST_FORMAT_SCHEMA))
+                        .delimiter(';')
+                        .charset(StandardCharsets.ISO_8859_1)
+                        .escapeCharacter('\\')
+                        .quoteCharacter('\"')
+                        .nullLiteral("null")
+                        .toProperties();
+        assertNotNull(properties);
+
+        final InLongMsgTlogCsvFormatDeserializer expectedDeser =
+                new InLongMsgTlogCsvFormatDeserializer(
+                        TEST_FORMAT_SCHEMA,
+                        InLongMsgUtils.DEFAULT_TIME_FIELD_NAME,
+                        InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME,
+                        StandardCharsets.ISO_8859_1.name(),
+                        ';',
+                        '\\',
+                        '\"',
+                        "null",
+                        false);
+
+        final TableFormatDeserializer actualDeser =
+                TableFormatForRowUtils.getTableFormatDeserializer(
+                        properties,
+                        getClass().getClassLoader());
+
+        assertEquals(expectedDeser, actualDeser);
+    }
+
+    @Test(expected = Exception.class)
+    public void testCreateTableFormatDeserializerWithDerivation() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.putAll(
+                new Schema()
+                        .schema(TableSchema.fromTypeInfo(SCHEMA))
+                        .toProperties());
+        properties.putAll(new 
InLongMsgTlogCsv().deriveSchema().toProperties());
+
+        final InLongMsgTlogCsvFormatDeserializer expectedDeser =
+                new 
InLongMsgTlogCsvFormatDeserializer.Builder(TEST_FORMAT_SCHEMA).build();
+
+        final TableFormatDeserializer actualDeser =
+                TableFormatForRowUtils.getTableFormatDeserializer(
+                        properties,
+                        getClass().getClassLoader());
+
+        assertEquals(expectedDeser, actualDeser);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvTest.java
new file mode 100644
index 0000000000..ff2642b0e4
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorTestBase;
+import org.apache.flink.table.descriptors.DescriptorValidator;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link InLongMsgTlogCsv} descriptor.
+ */
+public class InLongMsgTlogCsvTest extends DescriptorTestBase {
+
+    private static final String TEST_SCHEMA =
+            "{" +
+                    "\"type\":\"row\"," +
+                    "\"fieldFormats\":[{" +
+                    "\"name\":\"student_name\"," +
+                    "\"format\":{\"type\":\"string\"}" +
+                    "},{" +
+                    "\"name\":\"score\"," +
+                    "\"format\":{\"type\":\"int\"}" +
+                    "},{" +
+                    "\"name\":\"date\"," +
+                    "\"format\":{" +
+                    "\"type\":\"date\"," +
+                    "\"format\":\"yyyy-MM-dd\"" +
+                    "}" +
+                    "}]" +
+                    "}";
+
+    private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA =
+            new InLongMsgTlogCsv()
+                    .schema(TEST_SCHEMA)
+                    .timeFieldName("time")
+                    .attributesFieldName("attributes")
+                    .delimiter(';')
+                    .charset(StandardCharsets.ISO_8859_1)
+                    .escapeCharacter('\\')
+                    .quoteCharacter('\"')
+                    .nullLiteral("n/a")
+                    .ignoreErrors();
+
+    @Test(expected = ValidationException.class)
+    public void testInvalidIgnoreParseErrors() {
+        addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, 
"format.escape-character", "DDD");
+    }
+
+    @Test(expected = ValidationException.class)
+    public void testMissingSchema() {
+        removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, 
"format.schema");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public List<Descriptor> descriptors() {
+        return Collections.singletonList(CUSTOM_DESCRIPTOR_WITH_SCHEMA);
+    }
+
+    @Override
+    public List<Map<String, String>> properties() {
+        final Map<String, String> props1 = new HashMap<>();
+        props1.put("format.type", "inlong-msg-tlogcsv");
+        props1.put("format.property-version", "1");
+        props1.put("format.schema", TEST_SCHEMA);
+        props1.put("format.time-field-name", "time");
+        props1.put("format.attributes-field-name", "attributes");
+        props1.put("format.delimiter", ";");
+        props1.put("format.charset", "ISO-8859-1");
+        props1.put("format.escape-character", "\\");
+        props1.put("format.quote-character", "\"");
+        props1.put("format.null-literal", "n/a");
+        props1.put("format.ignore-errors", "true");
+
+        return Collections.singletonList(props1);
+    }
+
+    @Override
+    public DescriptorValidator validator() {
+        return new InLongMsgValidator();
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/resources/log4j-test.properties
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..881dc0609b
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-formats/format-row/pom.xml 
b/inlong-sort/sort-formats/format-row/pom.xml
index 1fbff77f1f..2e4f6b1b8b 100644
--- a/inlong-sort/sort-formats/format-row/pom.xml
+++ b/inlong-sort/sort-formats/format-row/pom.xml
@@ -41,6 +41,7 @@
         <module>format-inlongmsg-kv</module>
         <module>format-inlongmsg-pb</module>
         <module>format-inlongmsg-tlogkv</module>
+        <module>format-inlongmsg-tlogcsv</module>
     </modules>
 
     <properties>

Reply via email to