This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 29574c04cf [INLONG-9774][Sort] Support rowdata way of sort InLong
message tlog-kv format (#9780)
29574c04cf is described below
commit 29574c04cfca4b6f651b6bd35c8dfdba30b25d89
Author: baomingyu <[email protected]>
AuthorDate: Thu Mar 7 16:47:07 2024 +0800
[INLONG-9774][Sort] Support rowdata way of sort InLong message tlog-kv
format (#9780)
---
.../sort/formats/inlongmsg/InLongMsgUtils.java | 35 +++-
.../InLongMsgTlogCsvFormatDeserializer.java | 13 +-
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 26 +--
.../InLongMsgTlogCsvFormatDeserializerTest.java | 23 ++-
.../format-inlongmsg-rowdata-tlogkv/pom.xml | 136 +++++++++++++
.../formats/inlongmsgtlogkv/InLongMsgTlogKv.java | 96 +++++++++
.../InLongMsgTlogKvFormatDeserializer.java} | 144 ++++++++-----
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java} | 98 ++++-----
.../inlongmsgtlogkv/InLongMsgTlogKvValidator.java | 44 ++++
.../org.apache.flink.table.factories.TableFactory | 16 ++
.../InLongMsgTlogKvFormatDeserializerTest.java | 223 +++++++++++++++++++++
.../inlongmsgtlogkv/InLongMsgTlogKvTest.java | 110 ++++++++++
.../src/test/resources/log4j-test.properties | 27 +++
inlong-sort/sort-formats/format-rowdata/pom.xml | 1 +
inlong-sort/sort-formats/pom.xml | 1 +
15 files changed, 858 insertions(+), 135 deletions(-)
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index ce0b5c9dbe..1fd82e0d82 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
import javax.annotation.Nullable;
@@ -229,6 +228,35 @@ public class InLongMsgUtils {
return InternalTypeInfo.of(rowType);
}
+ public static TypeInformation<RowData>
decorateRowDataTypeWithNeededHeadFields(@Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ RowType dataRowType) {
+ List<String> fieldNames = new ArrayList<>();
+ List<LogicalType> fieldTypes = new ArrayList<>();
+
+ // Timestamp and attribute field
+ if (timeFieldName != null) {
+ fieldNames.add(timeFieldName);
+ fieldTypes.add(new TimestampType());
+ }
+
+ if (attributesFieldName != null) {
+ fieldNames.add(attributesFieldName);
+ fieldTypes.add(new MapType(new VarCharType(), new VarCharType()));
+ }
+
+ // Physical fields
+ List<RowType.RowField> fields = dataRowType.getFields();
+ for (RowType.RowField field : fields) {
+ fieldNames.add(field.getName());
+ fieldTypes.add(field.getType());
+ }
+
+ RowType rowType = RowType.of(fieldTypes.toArray(new LogicalType[0]),
fieldNames.toArray(new String[0]));
+
+ return InternalTypeInfo.of(rowType);
+ }
+
/**
* Creates the type information with given field names and data schema.
*
@@ -323,8 +351,7 @@ public class InLongMsgUtils {
@Nullable String attributesFieldName,
Timestamp time,
Map<String, String> attributes,
- Row dataRow,
- FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+ GenericRowData dataRow) {
List<Object> headFields = new ArrayList<>();
if (timeFieldName != null) {
headFields.add(TIME_FIELD_CONVERTER.convert(time));
@@ -340,7 +367,7 @@ public class InLongMsgUtils {
}
for (int i = 0; i < dataRow.getArity(); ++i) {
- rowData.setField(i + headFields.size(),
converters[i].convert(dataRow.getField(i)));
+ rowData.setField(i + headFields.size(), dataRow.getField(i));
}
return rowData;
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index 32f2edb7ca..3a9f4c001b 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -31,7 +31,6 @@ import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.Row;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -178,20 +177,20 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
@Override
protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
- Row dataRow =
- InLongMsgTlogCsvUtils.deserializeRow(
+ GenericRowData dataRow =
+ InLongMsgTlogCsvUtils.deserializeRowData(
rowFormatInfo,
nullLiteral,
head.getPredefinedFields(),
- body.getFields());
+ body.getFields(),
+ converters);
- GenericRowData genericRowData = (GenericRowData)
InLongMsgUtils.decorateRowWithNeededHeadFields(
+ GenericRowData genericRowData = (GenericRowData)
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
timeFieldName,
attributesFieldName,
head.getTime(),
head.getAttributes(),
- dataRow,
- converters);
+ dataRow);
return
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
head, metadataKeys));
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 2bf3ad6932..19c77fca7d 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -17,12 +17,13 @@
package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.data.GenericRowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,11 +106,12 @@ public class InLongMsgTlogCsvUtils {
* @param fields The fields.
* @return The row deserialized from the row.
*/
- public static Row deserializeRow(
+ public static GenericRowData deserializeRowData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
- List<String> fields) {
+ List<String> fields,
+ FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -119,7 +121,7 @@ public class InLongMsgTlogCsvUtils {
" expected, but was " + actualNumFields + ".");
}
- Row row = new Row(fieldNames.length);
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -133,12 +135,12 @@ public class InLongMsgTlogCsvUtils {
String fieldText = predefinedFields.get(i);
Object field =
- deserializeBasicField(
+ converters[i].convert(deserializeBasicField(
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
- row.setField(i, field);
+ nullLiteral));
+ rowData.setField(i, field);
}
for (int i = 0; i < fields.size(); ++i) {
@@ -153,18 +155,18 @@ public class InLongMsgTlogCsvUtils {
String fieldText = fields.get(i);
Object field =
- deserializeBasicField(
+ converters[i +
predefinedFields.size()].convert(deserializeBasicField(
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
- row.setField(i + predefinedFields.size(), field);
+ nullLiteral));
+ rowData.setField(i + predefinedFields.size(), field);
}
for (int i = predefinedFields.size() + fields.size(); i <
fieldNames.length; ++i) {
- row.setField(i, null);
+ rowData.setField(i, null);
}
- return row;
+ return rowData;
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 65423431be..ee78cd9ce2 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -44,7 +44,6 @@ import org.junit.Test;
import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -147,9 +146,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
expectRowData2.setField(5, StringData.fromString("field2"));
expectRowData2.setField(6, StringData.fromString("field3"));
- testRowDeserialization(
- inLongMsg1.buildArray(),
- Arrays.asList(expectRowData1, expectRowData2));
+ List expList = new ArrayList<>();
+ expList.add(expectRowData1);
+ expList.add(expectRowData2);
+
+ testRowDeserialization(inLongMsg1.buildArray(), expList);
}
@Test
@@ -174,9 +175,10 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
expectRowData.setField(5, StringData.fromString("field2"));
expectRowData.setField(6, StringData.fromString("field3"));
+ List expList = new ArrayList<>();
+ expList.add(expectRowData);
testRowDeserialization(
- inLongMsg1.buildArray(),
- Collections.singletonList(expectRowData));
+ inLongMsg1.buildArray(), expList);
}
@Test
@@ -211,10 +213,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
expectRowData2.setField(4, StringData.fromString("field1"));
expectRowData2.setField(5, StringData.fromString("field2"));
expectRowData2.setField(6, StringData.fromString("field3"));
-
+ List expList = new ArrayList<>();
+ expList.add(expectRowData1);
+ expList.add(expectRowData2);
testRowDeserialization(
- inLongMsg1.buildArray(),
- Arrays.asList(expectRowData1, expectRowData2));
+ inLongMsg1.buildArray(), expList);
}
@Test
@@ -294,9 +297,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
-
deserializer.flatMap(bytes, collector);
-
assertEquals(expectedRows, actualRows);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/pom.xml
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/pom.xml
new file mode 100644
index 0000000000..2d969bb54d
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>format-rowdata</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>format-inlongmsg-rowdata-tlogkv</artifactId>
+
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort Format-InLongMsg-Rowdata-TLogKV</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-rowdata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-inlongmsg-rowdata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ <version>${flink.jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <!-- Create SQL Client uber jars by default -->
+ <profile>
+ <id>sql-jars</id>
+ <activation>
+ <property>
+ <name>!skipSqlJars</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>sql-jar</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
new file mode 100644
index 0000000000..2a66e15680
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
+
+/**
+ * Format descriptor for InLongMsgTlogKVs.
+ */
+public class InLongMsgTlogKv extends TextFormatDescriptor<InLongMsgTlogKv> {
+
+ public static final String FORMAT_TYPE_VALUE = "inlong-msg-tlogkv";
+
+ public InLongMsgTlogKv() {
+ super(FORMAT_TYPE_VALUE, 1);
+ }
+
+ /**
+ * Sets the delimiter character (',' by default).
+ *
+ * @param delimiter the field delimiter character.
+ * @return The instance which has delimiter set.
+ */
+ public InLongMsgTlogKv delimiter(char delimiter) {
+ internalProperties.putCharacter(FORMAT_DELIMITER, delimiter);
+ return this;
+ }
+
+ /**
+ * Sets the entry delimiter character ('&' by default).
+ *
+ * @param delimiter the entry delimiter character.
+ * @return The instance which has delimiter set.
+ */
+ public InLongMsgTlogKv entryDelimiter(char delimiter) {
+ internalProperties.putCharacter(FORMAT_ENTRY_DELIMITER, delimiter);
+ return this;
+ }
+
+ /**
+ * Sets the kv delimiter character ('=' by default).
+ *
+ * @param delimiter the kv delimiter character.
+ * @return The instance which has delimiter set.
+ */
+ public InLongMsgTlogKv kvDelimiter(char delimiter) {
+ internalProperties.putCharacter(FORMAT_KV_DELIMITER, delimiter);
+ return this;
+ }
+
+ /**
+ * Sets the name of the time field.
+ *
+ * @param timeFieldName The name of the time field.
+ * @return The instance which has time field name set.
+ */
+ public InLongMsgTlogKv timeFieldName(String timeFieldName) {
+ checkNotNull(timeFieldName);
+ internalProperties.putString(FORMAT_TIME_FIELD_NAME, timeFieldName);
+ return this;
+ }
+
+ /**
+ * Sets the name of the attributes field.
+ *
+ * @param attributesFieldName The name of the attributes field.
+ * @return The instance which has attributes field name set.
+ */
+ public InLongMsgTlogKv attributesFieldName(String attributesFieldName) {
+ checkNotNull(attributesFieldName);
+ internalProperties.putString(FORMAT_ATTRIBUTE_FIELD_NAME,
attributesFieldName);
+ return this;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
similarity index 59%
copy from
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
copy to
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index 32f2edb7ca..2b8ad4c07c 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
@@ -31,7 +31,8 @@ import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -42,13 +43,21 @@ import java.util.List;
import java.util.Objects;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
/**
- * The deserializer for the records in InLongMsgTlogCsv format.
+ * The deserializer for the records in InLongMsgTlogKV format.
*/
-public final class InLongMsgTlogCsvFormatDeserializer extends
AbstractInLongMsgFormatDeserializer {
+public final class InLongMsgTlogKvFormatDeserializer extends
AbstractInLongMsgFormatDeserializer {
private static final long serialVersionUID = 1L;
@@ -73,6 +82,7 @@ public final class InLongMsgTlogCsvFormatDeserializer extends
AbstractInLongMsgF
/**
* The charset of the text.
*/
+ @Nonnull
private final String charset;
/**
@@ -81,6 +91,18 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
@Nonnull
private final Character delimiter;
+ /**
+ * The delimiter between entries.
+ */
+ @Nonnull
+ private final Character entryDelimiter;
+
+ /**
+ * The delimiter between key and value.
+ */
+ @Nonnull
+ private final Character kvDelimiter;
+
/**
* Escape character. Null if escaping is disabled.
*/
@@ -99,20 +121,19 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
@Nullable
private final String nullLiteral;
- private final List<String> metadataKeys;
-
private final FieldToRowDataConverter[] converters;
- public InLongMsgTlogCsvFormatDeserializer(
+ public InLongMsgTlogKvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
@Nullable String attributesFieldName,
@Nonnull String charset,
@Nonnull Character delimiter,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nullable String nullLiteral,
- List<String> metadataKeys,
@Nonnull Boolean ignoreErrors) {
this(
rowFormatInfo,
@@ -120,23 +141,25 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
attributesFieldName,
charset,
delimiter,
+ entryDelimiter,
+ kvDelimiter,
escapeChar,
quoteChar,
nullLiteral,
- metadataKeys,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
- public InLongMsgTlogCsvFormatDeserializer(
+ public InLongMsgTlogKvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
@Nullable String attributesFieldName,
@Nonnull String charset,
@Nonnull Character delimiter,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nullable String nullLiteral,
- List<String> metadataKeys,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);
@@ -145,69 +168,76 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
this.attributesFieldName = attributesFieldName;
this.charset = charset;
this.delimiter = delimiter;
+ this.entryDelimiter = entryDelimiter;
+ this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
- this.metadataKeys = metadataKeys;
- converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+ this.converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
- .toArray(FieldToRowDataConverter[]::new);
+
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
}
@Override
public TypeInformation<RowData> getProducedType() {
- return InLongMsgUtils.decorateRowTypeWithNeededHeadFieldsAndMetadata(
- timeFieldName,
- attributesFieldName,
- rowFormatInfo,
- metadataKeys);
+ return
InLongMsgUtils.decorateRowDataTypeWithNeededHeadFields(timeFieldName,
+ attributesFieldName, (RowType)
TableFormatUtils.deriveLogicalType(rowFormatInfo));
}
@Override
protected InLongMsgHead parseHead(String attr) throws Exception {
- return InLongMsgTlogCsvUtils.parseHead(attr);
+ return InLongMsgTlogKvUtils.parseHead(attr);
}
@Override
protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception
{
return Collections.singletonList(
- InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter,
escapeChar, quoteChar));
+ InLongMsgTlogKvUtils.parseBody(
+ bytes,
+ charset,
+ delimiter,
+ entryDelimiter,
+ kvDelimiter,
+ escapeChar,
+ quoteChar));
}
@Override
protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
- Row dataRow =
- InLongMsgTlogCsvUtils.deserializeRow(
+ GenericRowData genericRowData =
+ InLongMsgTlogKvUtils.deserializeRowData(
rowFormatInfo,
nullLiteral,
head.getPredefinedFields(),
- body.getFields());
+ body.getEntries(), converters);
- GenericRowData genericRowData = (GenericRowData)
InLongMsgUtils.decorateRowWithNeededHeadFields(
+ RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
timeFieldName,
attributesFieldName,
head.getTime(),
head.getAttributes(),
- dataRow,
- converters);
+ genericRowData);
- return
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
head, metadataKeys));
+ return Collections.singletonList(rowData);
}
/**
- * The builder for {@link InLongMsgTlogCsvFormatDeserializer}.
+ * The builder for {@link InLongMsgTlogKvFormatDeserializer}.
*/
public static class Builder extends TextFormatBuilder<Builder> {
private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
private Character delimiter = DEFAULT_DELIMITER;
- private List<String> metadataKeys = Collections.emptyList();
+ private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+ private Character kvDelimiter = DEFAULT_KV_DELIMITER;
public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
+
+ this.charset = DEFAULT_INLONGMSG_TLOGKV_CHARSET;
}
public Builder setTimeFieldName(String timeFieldName) {
@@ -225,56 +255,80 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
return this;
}
- public Builder setMetadataKeys(List<String> metadataKeys) {
- this.metadataKeys = metadataKeys;
+ public Builder setEntryDelimiter(Character entryDelimiter) {
+ this.entryDelimiter = entryDelimiter;
+ return this;
+ }
+
+ public Builder setKvDelimiter(Character kvDelimiter) {
+ this.kvDelimiter = kvDelimiter;
+ return this;
+ }
+
+ public Builder configure(DescriptorProperties descriptorProperties) {
+ super.configure(descriptorProperties);
+
+ descriptorProperties.getOptionalString(FORMAT_TIME_FIELD_NAME)
+ .ifPresent(this::setTimeFieldName);
+ descriptorProperties.getOptionalString(FORMAT_ATTRIBUTE_FIELD_NAME)
+ .ifPresent(this::setAttributesFieldName);
+ descriptorProperties.getOptionalCharacter(FORMAT_DELIMITER)
+ .ifPresent(this::setDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_ENTRY_DELIMITER)
+ .ifPresent(this::setEntryDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
+ .ifPresent(this::setKvDelimiter);
+
return this;
}
- public InLongMsgTlogCsvFormatDeserializer build() {
- return new InLongMsgTlogCsvFormatDeserializer(
+ public InLongMsgTlogKvFormatDeserializer build() {
+ return new InLongMsgTlogKvFormatDeserializer(
rowFormatInfo,
timeFieldName,
attributesFieldName,
charset,
delimiter,
+ entryDelimiter,
+ kvDelimiter,
escapeChar,
quoteChar,
nullLiteral,
- metadataKeys,
ignoreErrors);
}
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
+ public boolean equals(Object object) {
+ if (this == object) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
+ if (object == null || getClass() != object.getClass()) {
return false;
}
- if (!super.equals(o)) {
+ if (!super.equals(object)) {
return false;
}
- InLongMsgTlogCsvFormatDeserializer that =
(InLongMsgTlogCsvFormatDeserializer) o;
+ InLongMsgTlogKvFormatDeserializer that =
(InLongMsgTlogKvFormatDeserializer) object;
return rowFormatInfo.equals(that.rowFormatInfo) &&
Objects.equals(timeFieldName, that.timeFieldName) &&
Objects.equals(attributesFieldName, that.attributesFieldName)
&&
- Objects.equals(charset, that.charset) &&
+ charset.equals(that.charset) &&
delimiter.equals(that.delimiter) &&
+ entryDelimiter.equals(that.entryDelimiter) &&
+ kvDelimiter.equals(that.kvDelimiter) &&
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
- Objects.equals(nullLiteral, that.nullLiteral) &&
- Objects.equals(metadataKeys, that.metadataKeys);
+ Objects.equals(nullLiteral, that.nullLiteral);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
- attributesFieldName, charset, delimiter, escapeChar, quoteChar,
- nullLiteral, metadataKeys);
+ attributesFieldName, charset, delimiter, entryDelimiter,
kvDelimiter,
+ escapeChar, quoteChar, nullLiteral);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
similarity index 66%
copy from
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
copy to
inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 2bf3ad6932..4ec1a4d149 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -15,24 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.table.data.GenericRowData;
import java.nio.charset.Charset;
import java.sql.Timestamp;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
@@ -42,20 +39,19 @@ import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
/**
- * Utilities for InLongMsgTlogCsv.
+ * Utilities for {@link InLongMsgTlogKv}.
*/
-public class InLongMsgTlogCsvUtils {
+public class InLongMsgTlogKvUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(InLongMsgTlogCsvUtils.class);
+ public static final String DEFAULT_INLONGMSG_TLOGKV_CHARSET = "ISO_8859_1";
public static InLongMsgHead parseHead(String attr) {
Map<String, String> attributes = parseAttr(attr);
- // Extracts time from the attributes
Timestamp time;
-
if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
time = parseDateTime(date);
@@ -68,7 +64,6 @@ public class InLongMsgTlogCsvUtils {
" or " + INLONGMSG_ATTR_TIME_DT + " in
attributes!");
}
- // Extracts predefined fields from the attributes
List<String> predefinedFields = getPredefinedFields(attributes);
return new InLongMsgHead(attributes, null, time, predefinedFields);
@@ -78,6 +73,8 @@ public class InLongMsgTlogCsvUtils {
byte[] bytes,
String charset,
char delimiter,
+ char entryDelimiter,
+ char kvDelimiter,
Character escapeChar,
Character quoteChar) {
String text;
@@ -89,37 +86,37 @@ public class InLongMsgTlogCsvUtils {
String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
- String tid = segments[0];
- List<String> fields =
- Arrays.stream(segments, 1,
segments.length).collect(Collectors.toList());
+ String streamId = segments[0];
+
+ Map<String, String> entries;
+ if (segments.length > 1) {
+ entries = splitKv(segments[1], entryDelimiter, kvDelimiter,
escapeChar, quoteChar);
+ } else {
+ entries = Collections.emptyMap();
+ }
- return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+ return new InLongMsgBody(bytes, streamId, Collections.emptyList(),
entries);
}
/**
- * Deserializes the given fields into the row.
+ * Deserializes the row from the given entries.
*
* @param rowFormatInfo The format information of the row.
* @param nullLiteral The literal for null values.
* @param predefinedFields The predefined fields.
- * @param fields The fields.
- * @return The row deserialized from the row.
+ * @param entries The entries.
+ * @return The row deserialized from the given entries.
*/
- public static Row deserializeRow(
+ public static GenericRowData deserializeRowData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
- List<String> fields) {
+ Map<String, String> entries,
+ FieldToRowDataConverter[] converters) {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
- int actualNumFields = predefinedFields.size() + fields.size();
- if (actualNumFields != fieldNames.length) {
- LOG.warn("The number of fields mismatches: " + fieldNames.length +
- " expected, but was " + actualNumFields + ".");
- }
-
- Row row = new Row(fieldNames.length);
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -129,42 +126,31 @@ public class InLongMsgTlogCsvUtils {
String fieldName = fieldNames[i];
FormatInfo fieldFormatInfo = fieldFormatInfos[i];
-
String fieldText = predefinedFields.get(i);
-
- Object field =
+ FieldToRowDataConverter converter = converters[i];
+ Object field = converter.convert(
deserializeBasicField(
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
- row.setField(i, field);
+ nullLiteral));
+ rowData.setField(i, field);
}
- for (int i = 0; i < fields.size(); ++i) {
-
- if (i + predefinedFields.size() >= fieldNames.length) {
- break;
- }
-
- String fieldName = fieldNames[i + predefinedFields.size()];
- FormatInfo fieldFormatInfo = fieldFormatInfos[i +
predefinedFields.size()];
-
- String fieldText = fields.get(i);
-
- Object field =
- deserializeBasicField(
- fieldName,
- fieldFormatInfo,
- fieldText,
- nullLiteral);
- row.setField(i + predefinedFields.size(), field);
- }
-
- for (int i = predefinedFields.size() + fields.size(); i <
fieldNames.length; ++i) {
- row.setField(i, null);
+ for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ String fieldText = entries.get(fieldName);
+ FieldToRowDataConverter converter = converters[i];
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral));
+ rowData.setField(i, field);
}
- return row;
+ return rowData;
}
+
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
new file mode 100644
index 0000000000..8b163a4bb2
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+
+/**
+ * The validator for {@link InLongMsgTlogKv}.
+ */
+public class InLongMsgTlogKvValidator extends TextFormatDescriptorValidator {
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+
+ properties.validateString(FORMAT_DELIMITER, true, 1, 1);
+ properties.validateString(FORMAT_ENTRY_DELIMITER, true, 1, 1);
+ properties.validateString(FORMAT_KV_DELIMITER, true, 1, 1);
+
+ validateInLongMsgSchema(properties);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000..780d0af33b
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvFormatFactory
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
new file mode 100644
index 0000000000..d76b63ba41
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link InLongMsgTlogKvFormatDeserializer}.
+ */
+public class InLongMsgTlogKvFormatDeserializerTest {
+
+ private final FieldToRowDataConverters.FieldToRowDataConverter mapConvert =
+ FieldToRowDataConverters.createConverter(MAP(STRING(),
STRING()).getLogicalType());
+
+ private static final RowFormatInfo TEST_ROW_INFO =
+ new RowFormatInfo(
+ new String[]{"pf1", "pf2", "f1", "f2", "f3"},
+ new FormatInfo[]{
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+
+ @Test
+ public void testExceptionHandler() throws Exception {
+ TestFailureHandler errorHandler = new TestFailureHandler();
+ InLongMsgTlogKvFormatDeserializer deserializer =
+ new InLongMsgTlogKvFormatDeserializer(
+ TEST_ROW_INFO,
+ DEFAULT_TIME_FIELD_NAME,
+ DEFAULT_ATTRIBUTES_FIELD_NAME,
+ DEFAULT_INLONGMSG_TLOGKV_CHARSET,
+ DEFAULT_DELIMITER,
+ DEFAULT_ENTRY_DELIMITER,
+ DEFAULT_KV_DELIMITER,
+ null,
+ null,
+ null,
+ errorHandler);
+
+ InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+ String attrs = "m=0&t=20200322&__addcol1_=1&__addcol2_=tes";
+ String body1 = "testInterfaceId1,f1=field1&f2=field2&f3=field3";
+ String body2 = "f1=field1&f2=field2&f3=field3";
+ inLongMsg1.addMsg(attrs, body1.getBytes());
+ inLongMsg1.addMsg(attrs, body2.getBytes());
+
+ List<RowData> actualRowDatas = new ArrayList<>();
+ Collector<RowData> collector = new ListCollector<>(actualRowDatas);
+ deserializer.flatMap(inLongMsg1.buildArray(), collector);
+ assertEquals(2, errorHandler.getRowCount());
+
+ InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
+ String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
+ inLongMsg1Head.addMsg(abNormalAttrs, body1.getBytes());
+ inLongMsg1Head.addMsg(abNormalAttrs, body2.getBytes());
+ deserializer.flatMap(inLongMsg1Head.buildArray(), collector);
+ assertEquals(1, errorHandler.getHeadCount());
+ }
+
+ @Test
+ public void testDeserialize() throws Exception {
+ InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
+
+ String attrs = "m=0&t=20200322&__addcol1_=1&__addcol2_=2";
+ String body1 = "testInterfaceId1,f1=field1&f2=field2&f3=field3";
+ String body2 = "f1=field1&f2=field2&f3=field3";
+ String body3 = "f1=field1&f2=field2,f1=field1&f2=field2&f3=field3";
+ String body4 = ",testInterfaceId1,f1=field1&f2=field2&f3=field3";
+
+ inLongMsg1.addMsg(attrs, body1.getBytes());
+ inLongMsg1.addMsg(attrs, body2.getBytes());
+ inLongMsg1.addMsg(attrs, body3.getBytes());
+ inLongMsg1.addMsg(attrs, body4.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1_", "1");
+ expectedAttributes.put("__addcol2_", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(7);
+ expectRowData1.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, StringData.fromString("field1"));
+ expectRowData1.setField(5, StringData.fromString("field2"));
+ expectRowData1.setField(6, StringData.fromString("field3"));
+
+ GenericRowData expectRowData2 = new GenericRowData(7);
+ expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, null);
+ expectRowData2.setField(5, null);
+ expectRowData2.setField(6, null);
+
+ GenericRowData expectRowData3 = new GenericRowData(7);
+ expectRowData3.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData3.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData3.setField(2, 1);
+ expectRowData3.setField(3, 2);
+ expectRowData3.setField(4, StringData.fromString("field1"));
+ expectRowData3.setField(5, StringData.fromString("field2"));
+ expectRowData3.setField(6, StringData.fromString("field3"));
+
+ GenericRowData expectRowData4 = new GenericRowData(7);
+ expectRowData4.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData4.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData4.setField(2, 1);
+ expectRowData4.setField(3, 2);
+ expectRowData4.setField(4, StringData.fromString("field1"));
+ expectRowData4.setField(5, StringData.fromString("field2"));
+ expectRowData4.setField(6, StringData.fromString("field3"));
+
+ testRowDataDeserialization(inLongMsg1.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2, expectRowData3,
expectRowData4));
+ }
+
+ private void testRowDataDeserialization(
+ byte[] bytes,
+ List<RowData> expectedRows) throws Exception {
+ InLongMsgTlogKvFormatDeserializer deserializer =
+ new InLongMsgTlogKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ List<RowData> actualRowDatas = new ArrayList<>();
+ Collector<RowData> collector = new ListCollector<>(actualRowDatas);
+
+ deserializer.flatMap(bytes, collector);
+
+ assertEquals(expectedRows, actualRowDatas);
+ }
+
+ private static class TestFailureHandler implements FailureHandler {
+
+ private int headCount = 0;
+ private int bodyCount = 0;
+ private int rowCount = 0;
+
+ public int getHeadCount() {
+ return headCount;
+ }
+
+ public int getBodyCount() {
+ return bodyCount;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public void onParsingHeadFailure(String attribute, Exception
exception) throws Exception {
+ headCount++;
+ }
+
+ @Override
+ public void onParsingBodyFailure(InLongMsgHead head, byte[] body,
Exception exception) throws Exception {
+ bodyCount++;
+ }
+
+ @Override
+ public void onConvertingRowFailure(InLongMsgHead head,
+ InLongMsgBody body, Exception exception) throws Exception {
+ rowCount++;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvTest.java
new file mode 100644
index 0000000000..77bcec55f5
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgtlogkv;
+
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorTestBase;
+import org.apache.flink.table.descriptors.DescriptorValidator;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link InLongMsgTlogKv} descriptor.
+ */
+public class InLongMsgTlogKvTest extends DescriptorTestBase {
+
+ private static final String TEST_SCHEMA =
+ "{" +
+ "\"type\":\"row\"," +
+ "\"fieldFormats\":[{" +
+ "\"name\":\"student_name\"," +
+ "\"format\":{\"type\":\"string\"}" +
+ "},{" +
+ "\"name\":\"score\"," +
+ "\"format\":{\"type\":\"int\"}" +
+ "},{" +
+ "\"name\":\"date\"," +
+ "\"format\":{" +
+ "\"type\":\"date\"," +
+ "\"format\":\"yyyy-MM-dd\"" +
+ "}" +
+ "}]" +
+ "}";
+
+ private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA =
+ new InLongMsgTlogKv()
+ .schema(TEST_SCHEMA)
+ .timeFieldName("time")
+ .attributesFieldName("attributes")
+ .delimiter(';')
+ .entryDelimiter('&')
+ .kvDelimiter('=')
+ .charset(StandardCharsets.UTF_8)
+ .escapeCharacter('\\')
+ .quoteCharacter('\"')
+ .nullLiteral("n/a")
+ .ignoreErrors();
+
+ @Test(expected = ValidationException.class)
+ public void testInvalidIgnoreParseErrors() {
+ addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA,
"format.escape-character", "DDD");
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testMissingSchema() {
+ removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA,
"format.schema");
+ }
+
+ @Override
+ public List<Descriptor> descriptors() {
+ return Collections.singletonList(CUSTOM_DESCRIPTOR_WITH_SCHEMA);
+ }
+
+ @Override
+ public List<Map<String, String>> properties() {
+ final Map<String, String> props1 = new HashMap<>();
+ props1.put("format.type", "inlong-msg-tlogkv");
+ props1.put("format.property-version", "1");
+ props1.put("format.schema", TEST_SCHEMA);
+ props1.put("format.time-field-name", "time");
+ props1.put("format.attribute-field-name", "attributes");
+ props1.put("format.delimiter", ";");
+ props1.put("format.entry-delimiter", "&");
+ props1.put("format.kv-delimiter", "=");
+ props1.put("format.charset", "UTF-8");
+ props1.put("format.escape-character", "\\");
+ props1.put("format.quote-character", "\"");
+ props1.put("format.null-literal", "n/a");
+ props1.put("format.ignore-errors", "true");
+
+ return Collections.singletonList(props1);
+ }
+
+ @Override
+ public DescriptorValidator validator() {
+ return new InLongMsgValidator();
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/resources/log4j-test.properties
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..881dc0609b
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml
b/inlong-sort/sort-formats/format-rowdata/pom.xml
index daf7b05ef5..6b5f97b8fa 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -42,6 +42,7 @@
<module>format-inlongmsg-rowdata-kv</module>
<module>format-inlongmsg-rowdata-csv</module>
<module>format-inlongmsg-rowdata-tlogcsv</module>
+ <module>format-inlongmsg-rowdata-tlogkv</module>
</modules>
<properties>
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index d069c683d0..15804be0c2 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -110,6 +110,7 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>