This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 29574c04cf [INLONG-9774][Sort] Support rowdata way of sort InLong 
message tlog-kv format (#9780)
29574c04cf is described below

commit 29574c04cfca4b6f651b6bd35c8dfdba30b25d89
Author: baomingyu <[email protected]>
AuthorDate: Thu Mar 7 16:47:07 2024 +0800

    [INLONG-9774][Sort] Support rowdata way of sort InLong message tlog-kv 
format (#9780)
---
 .../sort/formats/inlongmsg/InLongMsgUtils.java     |  35 +++-
 .../InLongMsgTlogCsvFormatDeserializer.java        |  13 +-
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    |  26 +--
 .../InLongMsgTlogCsvFormatDeserializerTest.java    |  23 ++-
 .../format-inlongmsg-rowdata-tlogkv/pom.xml        | 136 +++++++++++++
 .../formats/inlongmsgtlogkv/InLongMsgTlogKv.java   |  96 +++++++++
 .../InLongMsgTlogKvFormatDeserializer.java}        | 144 ++++++++-----
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java}     |  98 ++++-----
 .../inlongmsgtlogkv/InLongMsgTlogKvValidator.java  |  44 ++++
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../InLongMsgTlogKvFormatDeserializerTest.java     | 223 +++++++++++++++++++++
 .../inlongmsgtlogkv/InLongMsgTlogKvTest.java       | 110 ++++++++++
 .../src/test/resources/log4j-test.properties       |  27 +++
 inlong-sort/sort-formats/format-rowdata/pom.xml    |   1 +
 inlong-sort/sort-formats/pom.xml                   |   1 +
 15 files changed, 858 insertions(+), 135 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index ce0b5c9dbe..1fd82e0d82 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
@@ -229,6 +228,35 @@ public class InLongMsgUtils {
         return InternalTypeInfo.of(rowType);
     }
 
+    public static TypeInformation<RowData> 
decorateRowDataTypeWithNeededHeadFields(@Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            RowType dataRowType) {
+        List<String> fieldNames = new ArrayList<>();
+        List<LogicalType> fieldTypes = new ArrayList<>();
+
+        // Timestamp and attribute field
+        if (timeFieldName != null) {
+            fieldNames.add(timeFieldName);
+            fieldTypes.add(new TimestampType());
+        }
+
+        if (attributesFieldName != null) {
+            fieldNames.add(attributesFieldName);
+            fieldTypes.add(new MapType(new VarCharType(), new VarCharType()));
+        }
+
+        // Physical fields
+        List<RowType.RowField> fields = dataRowType.getFields();
+        for (RowType.RowField field : fields) {
+            fieldNames.add(field.getName());
+            fieldTypes.add(field.getType());
+        }
+
+        RowType rowType = RowType.of(fieldTypes.toArray(new LogicalType[0]), 
fieldNames.toArray(new String[0]));
+
+        return InternalTypeInfo.of(rowType);
+    }
+
     /**
      * Creates the type information with given field names and data schema.
      *
@@ -323,8 +351,7 @@ public class InLongMsgUtils {
             @Nullable String attributesFieldName,
             Timestamp time,
             Map<String, String> attributes,
-            Row dataRow,
-            FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+            GenericRowData dataRow) {
         List<Object> headFields = new ArrayList<>();
         if (timeFieldName != null) {
             headFields.add(TIME_FIELD_CONVERTER.convert(time));
@@ -340,7 +367,7 @@ public class InLongMsgUtils {
         }
 
         for (int i = 0; i < dataRow.getArity(); ++i) {
-            rowData.setField(i + headFields.size(), 
converters[i].convert(dataRow.getField(i)));
+            rowData.setField(i + headFields.size(), dataRow.getField(i));
         }
 
         return rowData;
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index 32f2edb7ca..3a9f4c001b 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -31,7 +31,6 @@ import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.Row;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -178,20 +177,20 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
 
     @Override
     protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
-        Row dataRow =
-                InLongMsgTlogCsvUtils.deserializeRow(
+        GenericRowData dataRow =
+                InLongMsgTlogCsvUtils.deserializeRowData(
                         rowFormatInfo,
                         nullLiteral,
                         head.getPredefinedFields(),
-                        body.getFields());
+                        body.getFields(),
+                        converters);
 
-        GenericRowData genericRowData = (GenericRowData) 
InLongMsgUtils.decorateRowWithNeededHeadFields(
+        GenericRowData genericRowData = (GenericRowData) 
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
                 timeFieldName,
                 attributesFieldName,
                 head.getTime(),
                 head.getAttributes(),
-                dataRow,
-                converters);
+                dataRow);
 
         return 
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
 head, metadataKeys));
     }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 2bf3ad6932..19c77fca7d 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
-import org.apache.flink.types.Row;
+import org.apache.flink.table.data.GenericRowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,11 +106,12 @@ public class InLongMsgTlogCsvUtils {
      * @param fields The fields.
      * @return The row deserialized from the row.
      */
-    public static Row deserializeRow(
+    public static GenericRowData deserializeRowData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
-            List<String> fields) {
+            List<String> fields,
+            FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -119,7 +121,7 @@ public class InLongMsgTlogCsvUtils {
                     " expected, but was " + actualNumFields + ".");
         }
 
-        Row row = new Row(fieldNames.length);
+        GenericRowData rowData = new GenericRowData(fieldNames.length);
 
         for (int i = 0; i < predefinedFields.size(); ++i) {
 
@@ -133,12 +135,12 @@ public class InLongMsgTlogCsvUtils {
             String fieldText = predefinedFields.get(i);
 
             Object field =
-                    deserializeBasicField(
+                    converters[i].convert(deserializeBasicField(
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
-            row.setField(i, field);
+                            nullLiteral));
+            rowData.setField(i, field);
         }
 
         for (int i = 0; i < fields.size(); ++i) {
@@ -153,18 +155,18 @@ public class InLongMsgTlogCsvUtils {
             String fieldText = fields.get(i);
 
             Object field =
-                    deserializeBasicField(
+                    converters[i + 
predefinedFields.size()].convert(deserializeBasicField(
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
-            row.setField(i + predefinedFields.size(), field);
+                            nullLiteral));
+            rowData.setField(i + predefinedFields.size(), field);
         }
 
         for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
-            row.setField(i, null);
+            rowData.setField(i, null);
         }
 
-        return row;
+        return rowData;
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 65423431be..ee78cd9ce2 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -44,7 +44,6 @@ import org.junit.Test;
 
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -147,9 +146,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
         expectRowData2.setField(5, StringData.fromString("field2"));
         expectRowData2.setField(6, StringData.fromString("field3"));
 
-        testRowDeserialization(
-                inLongMsg1.buildArray(),
-                Arrays.asList(expectRowData1, expectRowData2));
+        List expList = new ArrayList<>();
+        expList.add(expectRowData1);
+        expList.add(expectRowData2);
+
+        testRowDeserialization(inLongMsg1.buildArray(), expList);
     }
 
     @Test
@@ -174,9 +175,10 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
         expectRowData.setField(5, StringData.fromString("field2"));
         expectRowData.setField(6, StringData.fromString("field3"));
 
+        List expList = new ArrayList<>();
+        expList.add(expectRowData);
         testRowDeserialization(
-                inLongMsg1.buildArray(),
-                Collections.singletonList(expectRowData));
+                inLongMsg1.buildArray(), expList);
     }
 
     @Test
@@ -211,10 +213,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
         expectRowData2.setField(4, StringData.fromString("field1"));
         expectRowData2.setField(5, StringData.fromString("field2"));
         expectRowData2.setField(6, StringData.fromString("field3"));
-
+        List expList = new ArrayList<>();
+        expList.add(expectRowData1);
+        expList.add(expectRowData2);
         testRowDeserialization(
-                inLongMsg1.buildArray(),
-                Arrays.asList(expectRowData1, expectRowData2));
+                inLongMsg1.buildArray(), expList);
     }
 
     @Test
@@ -294,9 +297,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
 
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
-
         deserializer.flatMap(bytes, collector);
-
         assertEquals(expectedRows, actualRows);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/pom.xml
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/pom.xml
new file mode 100644
index 0000000000..2d969bb54d
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>format-rowdata</artifactId>
+        <version>1.12.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>format-inlongmsg-rowdata-tlogkv</artifactId>
+
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort Format-InLongMsg-Rowdata-TLogKV</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+
+        <!-- core dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-rowdata-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-inlongmsg-rowdata-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+            <version>${flink.jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <profiles>
+        <!-- Create SQL Client uber jars by default -->
+        <profile>
+            <id>sql-jars</id>
+            <activation>
+                <property>
+                    <name>!skipSqlJars</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>jar</goal>
+                                </goals>
+                                <phase>package</phase>
+                                <configuration>
+                                    <classifier>sql-jar</classifier>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
new file mode 100644
index 0000000000..2a66e15680
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
+
+/**
+ * Format descriptor for InLongMsgTlogKVs.
+ */
+public class InLongMsgTlogKv extends TextFormatDescriptor<InLongMsgTlogKv> {
+
+    public static final String FORMAT_TYPE_VALUE = "inlong-msg-tlogkv";
+
+    public InLongMsgTlogKv() {
+        super(FORMAT_TYPE_VALUE, 1);
+    }
+
+    /**
+     * Sets the delimiter character (',' by default).
+     *
+     * @param delimiter the field delimiter character.
+     * @return The instance which has delimiter set.
+     */
+    public InLongMsgTlogKv delimiter(char delimiter) {
+        internalProperties.putCharacter(FORMAT_DELIMITER, delimiter);
+        return this;
+    }
+
+    /**
+     * Sets the entry delimiter character ('&amp;' by default).
+     *
+     * @param delimiter the entry delimiter character.
+     * @return The instance which has delimiter set.
+     */
+    public InLongMsgTlogKv entryDelimiter(char delimiter) {
+        internalProperties.putCharacter(FORMAT_ENTRY_DELIMITER, delimiter);
+        return this;
+    }
+
+    /**
+     * Sets the kv delimiter character ('=' by default).
+     *
+     * @param delimiter the kv delimiter character.
+     * @return The instance which has delimiter set.
+     */
+    public InLongMsgTlogKv kvDelimiter(char delimiter) {
+        internalProperties.putCharacter(FORMAT_KV_DELIMITER, delimiter);
+        return this;
+    }
+
+    /**
+     * Sets the name of the time field.
+     *
+     * @param timeFieldName The name of the time field.
+     * @return The instance which has time field name set.
+     */
+    public InLongMsgTlogKv timeFieldName(String timeFieldName) {
+        checkNotNull(timeFieldName);
+        internalProperties.putString(FORMAT_TIME_FIELD_NAME, timeFieldName);
+        return this;
+    }
+
+    /**
+     * Sets the name of the attributes field.
+     *
+     * @param attributesFieldName The name of the attributes field.
+     * @return The instance which has attributes field name set.
+     */
+    public InLongMsgTlogKv attributesFieldName(String attributesFieldName) {
+        checkNotNull(attributesFieldName);
+        internalProperties.putString(FORMAT_ATTRIBUTE_FIELD_NAME, 
attributesFieldName);
+        return this;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
similarity index 59%
copy from 
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
copy to 
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index 32f2edb7ca..2b8ad4c07c 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
@@ -31,7 +31,8 @@ import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -42,13 +43,21 @@ import java.util.List;
 import java.util.Objects;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
 
 /**
- * The deserializer for the records in InLongMsgTlogCsv format.
+ * The deserializer for the records in InLongMsgTlogKV format.
  */
-public final class InLongMsgTlogCsvFormatDeserializer extends 
AbstractInLongMsgFormatDeserializer {
+public final class InLongMsgTlogKvFormatDeserializer extends 
AbstractInLongMsgFormatDeserializer {
 
     private static final long serialVersionUID = 1L;
 
@@ -73,6 +82,7 @@ public final class InLongMsgTlogCsvFormatDeserializer extends 
AbstractInLongMsgF
     /**
      * The charset of the text.
      */
+    @Nonnull
     private final String charset;
 
     /**
@@ -81,6 +91,18 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
     @Nonnull
     private final Character delimiter;
 
+    /**
+     * The delimiter between entries.
+     */
+    @Nonnull
+    private final Character entryDelimiter;
+
+    /**
+     * The delimiter between key and value.
+     */
+    @Nonnull
+    private final Character kvDelimiter;
+
     /**
      * Escape character. Null if escaping is disabled.
      */
@@ -99,20 +121,19 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
     @Nullable
     private final String nullLiteral;
 
-    private final List<String> metadataKeys;
-
     private final FieldToRowDataConverter[] converters;
 
-    public InLongMsgTlogCsvFormatDeserializer(
+    public InLongMsgTlogKvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nullable String timeFieldName,
             @Nullable String attributesFieldName,
             @Nonnull String charset,
             @Nonnull Character delimiter,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
             @Nullable String nullLiteral,
-            List<String> metadataKeys,
             @Nonnull Boolean ignoreErrors) {
         this(
                 rowFormatInfo,
@@ -120,23 +141,25 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
                 attributesFieldName,
                 charset,
                 delimiter,
+                entryDelimiter,
+                kvDelimiter,
                 escapeChar,
                 quoteChar,
                 nullLiteral,
-                metadataKeys,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
     }
 
-    public InLongMsgTlogCsvFormatDeserializer(
+    public InLongMsgTlogKvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nullable String timeFieldName,
             @Nullable String attributesFieldName,
             @Nonnull String charset,
             @Nonnull Character delimiter,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
             @Nullable String nullLiteral,
-            List<String> metadataKeys,
             @Nonnull FailureHandler failureHandler) {
         super(failureHandler);
 
@@ -145,69 +168,76 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         this.attributesFieldName = attributesFieldName;
         this.charset = charset;
         this.delimiter = delimiter;
+        this.entryDelimiter = entryDelimiter;
+        this.kvDelimiter = kvDelimiter;
         this.escapeChar = escapeChar;
         this.quoteChar = quoteChar;
         this.nullLiteral = nullLiteral;
-        this.metadataKeys = metadataKeys;
 
-        converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+        this.converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
-                .toArray(FieldToRowDataConverter[]::new);
+                
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
     }
 
     @Override
     public TypeInformation<RowData> getProducedType() {
-        return InLongMsgUtils.decorateRowTypeWithNeededHeadFieldsAndMetadata(
-                timeFieldName,
-                attributesFieldName,
-                rowFormatInfo,
-                metadataKeys);
+        return 
InLongMsgUtils.decorateRowDataTypeWithNeededHeadFields(timeFieldName,
+                attributesFieldName, (RowType) 
TableFormatUtils.deriveLogicalType(rowFormatInfo));
     }
 
     @Override
     protected InLongMsgHead parseHead(String attr) throws Exception {
-        return InLongMsgTlogCsvUtils.parseHead(attr);
+        return InLongMsgTlogKvUtils.parseHead(attr);
     }
 
     @Override
     protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception 
{
         return Collections.singletonList(
-                InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, 
escapeChar, quoteChar));
+                InLongMsgTlogKvUtils.parseBody(
+                        bytes,
+                        charset,
+                        delimiter,
+                        entryDelimiter,
+                        kvDelimiter,
+                        escapeChar,
+                        quoteChar));
     }
 
     @Override
     protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
-        Row dataRow =
-                InLongMsgTlogCsvUtils.deserializeRow(
+        GenericRowData genericRowData =
+                InLongMsgTlogKvUtils.deserializeRowData(
                         rowFormatInfo,
                         nullLiteral,
                         head.getPredefinedFields(),
-                        body.getFields());
+                        body.getEntries(), converters);
 
-        GenericRowData genericRowData = (GenericRowData) 
InLongMsgUtils.decorateRowWithNeededHeadFields(
+        RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
                 timeFieldName,
                 attributesFieldName,
                 head.getTime(),
                 head.getAttributes(),
-                dataRow,
-                converters);
+                genericRowData);
 
-        return 
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
 head, metadataKeys));
+        return Collections.singletonList(rowData);
     }
 
     /**
-     * The builder for {@link InLongMsgTlogCsvFormatDeserializer}.
+     * The builder for {@link InLongMsgTlogKvFormatDeserializer}.
      */
     public static class Builder extends TextFormatBuilder<Builder> {
 
         private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
         private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
         private Character delimiter = DEFAULT_DELIMITER;
-        private List<String> metadataKeys = Collections.emptyList();
+        private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+        private Character kvDelimiter = DEFAULT_KV_DELIMITER;
 
         public Builder(RowFormatInfo rowFormatInfo) {
             super(rowFormatInfo);
+
+            this.charset = DEFAULT_INLONGMSG_TLOGKV_CHARSET;
         }
 
         public Builder setTimeFieldName(String timeFieldName) {
@@ -225,56 +255,80 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
             return this;
         }
 
-        public Builder setMetadataKeys(List<String> metadataKeys) {
-            this.metadataKeys = metadataKeys;
+        public Builder setEntryDelimiter(Character entryDelimiter) {
+            this.entryDelimiter = entryDelimiter;
+            return this;
+        }
+
+        public Builder setKvDelimiter(Character kvDelimiter) {
+            this.kvDelimiter = kvDelimiter;
+            return this;
+        }
+
+        public Builder configure(DescriptorProperties descriptorProperties) {
+            super.configure(descriptorProperties);
+
+            descriptorProperties.getOptionalString(FORMAT_TIME_FIELD_NAME)
+                    .ifPresent(this::setTimeFieldName);
+            descriptorProperties.getOptionalString(FORMAT_ATTRIBUTE_FIELD_NAME)
+                    .ifPresent(this::setAttributesFieldName);
+            descriptorProperties.getOptionalCharacter(FORMAT_DELIMITER)
+                    .ifPresent(this::setDelimiter);
+            descriptorProperties.getOptionalCharacter(FORMAT_ENTRY_DELIMITER)
+                    .ifPresent(this::setEntryDelimiter);
+            descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
+                    .ifPresent(this::setKvDelimiter);
+
             return this;
         }
 
-        public InLongMsgTlogCsvFormatDeserializer build() {
-            return new InLongMsgTlogCsvFormatDeserializer(
+        public InLongMsgTlogKvFormatDeserializer build() {
+            return new InLongMsgTlogKvFormatDeserializer(
                     rowFormatInfo,
                     timeFieldName,
                     attributesFieldName,
                     charset,
                     delimiter,
+                    entryDelimiter,
+                    kvDelimiter,
                     escapeChar,
                     quoteChar,
                     nullLiteral,
-                    metadataKeys,
                     ignoreErrors);
         }
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) {
+    public boolean equals(Object object) {
+        if (this == object) {
             return true;
         }
 
-        if (o == null || getClass() != o.getClass()) {
+        if (object == null || getClass() != object.getClass()) {
             return false;
         }
 
-        if (!super.equals(o)) {
+        if (!super.equals(object)) {
             return false;
         }
 
-        InLongMsgTlogCsvFormatDeserializer that = 
(InLongMsgTlogCsvFormatDeserializer) o;
+        InLongMsgTlogKvFormatDeserializer that = 
(InLongMsgTlogKvFormatDeserializer) object;
         return rowFormatInfo.equals(that.rowFormatInfo) &&
                 Objects.equals(timeFieldName, that.timeFieldName) &&
                 Objects.equals(attributesFieldName, that.attributesFieldName) 
&&
-                Objects.equals(charset, that.charset) &&
+                charset.equals(that.charset) &&
                 delimiter.equals(that.delimiter) &&
+                entryDelimiter.equals(that.entryDelimiter) &&
+                kvDelimiter.equals(that.kvDelimiter) &&
                 Objects.equals(escapeChar, that.escapeChar) &&
                 Objects.equals(quoteChar, that.quoteChar) &&
-                Objects.equals(nullLiteral, that.nullLiteral) &&
-                Objects.equals(metadataKeys, that.metadataKeys);
+                Objects.equals(nullLiteral, that.nullLiteral);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
-                attributesFieldName, charset, delimiter, escapeChar, quoteChar,
-                nullLiteral, metadataKeys);
+                attributesFieldName, charset, delimiter, entryDelimiter, 
kvDelimiter,
+                escapeChar, quoteChar, nullLiteral);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
similarity index 66%
copy from 
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
copy to 
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 2bf3ad6932..4ec1a4d149 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -15,24 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 
+import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
-import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.table.data.GenericRowData;
 
 import java.nio.charset.Charset;
 import java.sql.Timestamp;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
@@ -42,20 +39,19 @@ import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
 
 /**
- * Utilities for InLongMsgTlogCsv.
+ * Utilities for {@link InLongMsgTlogKv}.
  */
-public class InLongMsgTlogCsvUtils {
+public class InLongMsgTlogKvUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(InLongMsgTlogCsvUtils.class);
+    public static final String DEFAULT_INLONGMSG_TLOGKV_CHARSET = "ISO_8859_1";
 
     public static InLongMsgHead parseHead(String attr) {
         Map<String, String> attributes = parseAttr(attr);
 
-        // Extracts time from the attributes
         Timestamp time;
-
         if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
             String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
             time = parseDateTime(date);
@@ -68,7 +64,6 @@ public class InLongMsgTlogCsvUtils {
                             " or " + INLONGMSG_ATTR_TIME_DT + " in 
attributes!");
         }
 
-        // Extracts predefined fields from the attributes
         List<String> predefinedFields = getPredefinedFields(attributes);
 
         return new InLongMsgHead(attributes, null, time, predefinedFields);
@@ -78,6 +73,8 @@ public class InLongMsgTlogCsvUtils {
             byte[] bytes,
             String charset,
             char delimiter,
+            char entryDelimiter,
+            char kvDelimiter,
             Character escapeChar,
             Character quoteChar) {
         String text;
@@ -89,37 +86,37 @@ public class InLongMsgTlogCsvUtils {
 
         String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
 
-        String tid = segments[0];
-        List<String> fields =
-                Arrays.stream(segments, 1, 
segments.length).collect(Collectors.toList());
+        String streamId = segments[0];
+
+        Map<String, String> entries;
+        if (segments.length > 1) {
+            entries = splitKv(segments[1], entryDelimiter, kvDelimiter, 
escapeChar, quoteChar);
+        } else {
+            entries = Collections.emptyMap();
+        }
 
-        return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+        return new InLongMsgBody(bytes, streamId, Collections.emptyList(), 
entries);
     }
 
     /**
-     * Deserializes the given fields into the row.
+     * Deserializes the row from the given entries.
      *
      * @param rowFormatInfo The format information of the row.
      * @param nullLiteral The literal for null values.
      * @param predefinedFields The predefined fields.
-     * @param fields The fields.
-     * @return The row deserialized from the row.
+     * @param entries The entries.
+     * @return The row deserialized from the given entries.
      */
-    public static Row deserializeRow(
+    public static GenericRowData deserializeRowData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
-            List<String> fields) {
+            Map<String, String> entries,
+            FieldToRowDataConverter[] converters) {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
-        int actualNumFields = predefinedFields.size() + fields.size();
-        if (actualNumFields != fieldNames.length) {
-            LOG.warn("The number of fields mismatches: " + fieldNames.length +
-                    " expected, but was " + actualNumFields + ".");
-        }
-
-        Row row = new Row(fieldNames.length);
+        GenericRowData rowData = new GenericRowData(fieldNames.length);
 
         for (int i = 0; i < predefinedFields.size(); ++i) {
 
@@ -129,42 +126,31 @@ public class InLongMsgTlogCsvUtils {
 
             String fieldName = fieldNames[i];
             FormatInfo fieldFormatInfo = fieldFormatInfos[i];
-
             String fieldText = predefinedFields.get(i);
-
-            Object field =
+            FieldToRowDataConverter converter = converters[i];
+            Object field = converter.convert(
                     deserializeBasicField(
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
-            row.setField(i, field);
+                            nullLiteral));
+            rowData.setField(i, field);
         }
 
-        for (int i = 0; i < fields.size(); ++i) {
-
-            if (i + predefinedFields.size() >= fieldNames.length) {
-                break;
-            }
-
-            String fieldName = fieldNames[i + predefinedFields.size()];
-            FormatInfo fieldFormatInfo = fieldFormatInfos[i + 
predefinedFields.size()];
-
-            String fieldText = fields.get(i);
-
-            Object field =
-                    deserializeBasicField(
-                            fieldName,
-                            fieldFormatInfo,
-                            fieldText,
-                            nullLiteral);
-            row.setField(i + predefinedFields.size(), field);
-        }
-
-        for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
-            row.setField(i, null);
+        for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            String fieldText = entries.get(fieldName);
+            FieldToRowDataConverter converter = converters[i];
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral));
+            rowData.setField(i, field);
         }
 
-        return row;
+        return rowData;
     }
+
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
new file mode 100644
index 0000000000..8b163a4bb2
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+
+/**
+ * The validator for {@link InLongMsgTlogKv}.
+ */
+public class InLongMsgTlogKvValidator extends TextFormatDescriptorValidator {
+
+    @Override
+    public void validate(DescriptorProperties properties) {
+        super.validate(properties);
+
+        properties.validateString(FORMAT_DELIMITER, true, 1, 1);
+        properties.validateString(FORMAT_ENTRY_DELIMITER, true, 1, 1);
+        properties.validateString(FORMAT_KV_DELIMITER, true, 1, 1);
+
+        validateInLongMsgSchema(properties);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000..780d0af33b
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvFormatFactory
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
new file mode 100644
index 0000000000..d76b63ba41
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link InLongMsgTlogKvFormatDeserializer}.
+ */
+public class InLongMsgTlogKvFormatDeserializerTest {
+
+    private final FieldToRowDataConverters.FieldToRowDataConverter mapConvert =
+            FieldToRowDataConverters.createConverter(MAP(STRING(), 
STRING()).getLogicalType());
+
+    private static final RowFormatInfo TEST_ROW_INFO =
+            new RowFormatInfo(
+                    new String[]{"pf1", "pf2", "f1", "f2", "f3"},
+                    new FormatInfo[]{
+                            IntFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE
+                    });
+
+    @Test
+    public void testExceptionHandler() throws Exception {
+        TestFailureHandler errorHandler = new TestFailureHandler();
+        InLongMsgTlogKvFormatDeserializer deserializer =
+                new InLongMsgTlogKvFormatDeserializer(
+                        TEST_ROW_INFO,
+                        DEFAULT_TIME_FIELD_NAME,
+                        DEFAULT_ATTRIBUTES_FIELD_NAME,
+                        DEFAULT_INLONGMSG_TLOGKV_CHARSET,
+                        DEFAULT_DELIMITER,
+                        DEFAULT_ENTRY_DELIMITER,
+                        DEFAULT_KV_DELIMITER,
+                        null,
+                        null,
+                        null,
+                        errorHandler);
+
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0&t=20200322&__addcol1_=1&__addcol2_=tes";
+        String body1 = "testInterfaceId1,f1=field1&f2=field2&f3=field3";
+        String body2 = "f1=field1&f2=field2&f3=field3";
+        inLongMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg1.addMsg(attrs, body2.getBytes());
+
+        List<RowData> actualRowDatas = new ArrayList<>();
+        Collector<RowData> collector = new ListCollector<>(actualRowDatas);
+        deserializer.flatMap(inLongMsg1.buildArray(), collector);
+        assertEquals(2, errorHandler.getRowCount());
+
+        InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
+        String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
+        inLongMsg1Head.addMsg(abNormalAttrs, body1.getBytes());
+        inLongMsg1Head.addMsg(abNormalAttrs, body2.getBytes());
+        deserializer.flatMap(inLongMsg1Head.buildArray(), collector);
+        assertEquals(1, errorHandler.getHeadCount());
+    }
+
+    @Test
+    public void testDeserialize() throws Exception {
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+
+        String attrs = "m=0&t=20200322&__addcol1_=1&__addcol2_=2";
+        String body1 = "testInterfaceId1,f1=field1&f2=field2&f3=field3";
+        String body2 = "f1=field1&f2=field2&f3=field3";
+        String body3 = "f1=field1&f2=field2,f1=field1&f2=field2&f3=field3";
+        String body4 = ",testInterfaceId1,f1=field1&f2=field2&f3=field3";
+
+        inLongMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg1.addMsg(attrs, body2.getBytes());
+        inLongMsg1.addMsg(attrs, body3.getBytes());
+        inLongMsg1.addMsg(attrs, body4.getBytes());
+
+        Map<String, String> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("m", "0");
+        expectedAttributes.put("t", "20200322");
+        expectedAttributes.put("__addcol1_", "1");
+        expectedAttributes.put("__addcol2_", "2");
+
+        GenericRowData expectRowData1 = new GenericRowData(7);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, StringData.fromString("field1"));
+        expectRowData1.setField(5, StringData.fromString("field2"));
+        expectRowData1.setField(6, StringData.fromString("field3"));
+
+        GenericRowData expectRowData2 = new GenericRowData(7);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, null);
+        expectRowData2.setField(5, null);
+        expectRowData2.setField(6, null);
+
+        GenericRowData expectRowData3 = new GenericRowData(7);
+        expectRowData3.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData3.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData3.setField(2, 1);
+        expectRowData3.setField(3, 2);
+        expectRowData3.setField(4, StringData.fromString("field1"));
+        expectRowData3.setField(5, StringData.fromString("field2"));
+        expectRowData3.setField(6, StringData.fromString("field3"));
+
+        GenericRowData expectRowData4 = new GenericRowData(7);
+        expectRowData4.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData4.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData4.setField(2, 1);
+        expectRowData4.setField(3, 2);
+        expectRowData4.setField(4, StringData.fromString("field1"));
+        expectRowData4.setField(5, StringData.fromString("field2"));
+        expectRowData4.setField(6, StringData.fromString("field3"));
+
+        testRowDataDeserialization(inLongMsg1.buildArray(),
+                Arrays.asList(expectRowData1, expectRowData2, expectRowData3, 
expectRowData4));
+    }
+
+    private void testRowDataDeserialization(
+            byte[] bytes,
+            List<RowData> expectedRows) throws Exception {
+        InLongMsgTlogKvFormatDeserializer deserializer =
+                new InLongMsgTlogKvFormatDeserializer.Builder(TEST_ROW_INFO)
+                        .setTimeFieldName("inlongmsg_time")
+                        .setAttributesFieldName("inlongmsg_attributes")
+                        .build();
+
+        List<RowData> actualRowDatas = new ArrayList<>();
+        Collector<RowData> collector = new ListCollector<>(actualRowDatas);
+
+        deserializer.flatMap(bytes, collector);
+
+        assertEquals(expectedRows, actualRowDatas);
+    }
+
+    private static class TestFailureHandler implements FailureHandler {
+
+        private int headCount = 0;
+        private int bodyCount = 0;
+        private int rowCount = 0;
+
+        public int getHeadCount() {
+            return headCount;
+        }
+
+        public int getBodyCount() {
+            return bodyCount;
+        }
+
+        public int getRowCount() {
+            return rowCount;
+        }
+
+        @Override
+        public void onParsingHeadFailure(String attribute, Exception 
exception) throws Exception {
+            headCount++;
+        }
+
+        @Override
+        public void onParsingBodyFailure(InLongMsgHead head, byte[] body, 
Exception exception) throws Exception {
+            bodyCount++;
+        }
+
+        @Override
+        public void onConvertingRowFailure(InLongMsgHead head,
+                InLongMsgBody body, Exception exception) throws Exception {
+            rowCount++;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvTest.java
new file mode 100644
index 0000000000..77bcec55f5
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorTestBase;
+import org.apache.flink.table.descriptors.DescriptorValidator;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link InLongMsgTlogKv} descriptor.
+ */
+public class InLongMsgTlogKvTest extends DescriptorTestBase {
+
+    private static final String TEST_SCHEMA =
+            "{" +
+                    "\"type\":\"row\"," +
+                    "\"fieldFormats\":[{" +
+                    "\"name\":\"student_name\"," +
+                    "\"format\":{\"type\":\"string\"}" +
+                    "},{" +
+                    "\"name\":\"score\"," +
+                    "\"format\":{\"type\":\"int\"}" +
+                    "},{" +
+                    "\"name\":\"date\"," +
+                    "\"format\":{" +
+                    "\"type\":\"date\"," +
+                    "\"format\":\"yyyy-MM-dd\"" +
+                    "}" +
+                    "}]" +
+                    "}";
+
+    private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA =
+            new InLongMsgTlogKv()
+                    .schema(TEST_SCHEMA)
+                    .timeFieldName("time")
+                    .attributesFieldName("attributes")
+                    .delimiter(';')
+                    .entryDelimiter('&')
+                    .kvDelimiter('=')
+                    .charset(StandardCharsets.UTF_8)
+                    .escapeCharacter('\\')
+                    .quoteCharacter('\"')
+                    .nullLiteral("n/a")
+                    .ignoreErrors();
+
+    @Test(expected = ValidationException.class)
+    public void testInvalidIgnoreParseErrors() {
+        addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, 
"format.escape-character", "DDD");
+    }
+
+    @Test(expected = ValidationException.class)
+    public void testMissingSchema() {
+        removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, 
"format.schema");
+    }
+
+    @Override
+    public List<Descriptor> descriptors() {
+        return Collections.singletonList(CUSTOM_DESCRIPTOR_WITH_SCHEMA);
+    }
+
+    @Override
+    public List<Map<String, String>> properties() {
+        final Map<String, String> props1 = new HashMap<>();
+        props1.put("format.type", "inlong-msg-tlogkv");
+        props1.put("format.property-version", "1");
+        props1.put("format.schema", TEST_SCHEMA);
+        props1.put("format.time-field-name", "time");
+        props1.put("format.attribute-field-name", "attributes");
+        props1.put("format.delimiter", ";");
+        props1.put("format.entry-delimiter", "&");
+        props1.put("format.kv-delimiter", "=");
+        props1.put("format.charset", "UTF-8");
+        props1.put("format.escape-character", "\\");
+        props1.put("format.quote-character", "\"");
+        props1.put("format.null-literal", "n/a");
+        props1.put("format.ignore-errors", "true");
+
+        return Collections.singletonList(props1);
+    }
+
+    @Override
+    public DescriptorValidator validator() {
+        return new InLongMsgValidator();
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/resources/log4j-test.properties
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..881dc0609b
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/pom.xml
index daf7b05ef5..6b5f97b8fa 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -42,6 +42,7 @@
         <module>format-inlongmsg-rowdata-kv</module>
         <module>format-inlongmsg-rowdata-csv</module>
         <module>format-inlongmsg-rowdata-tlogcsv</module>
+        <module>format-inlongmsg-rowdata-tlogkv</module>
     </modules>
 
     <properties>
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index d069c683d0..15804be0c2 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -110,6 +110,7 @@
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>

Reply via email to