This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 241fadef41 [INLONG-9568][Sort] Support rowdata way of sort InLong
message kv format (#9647)
241fadef41 is described below
commit 241fadef41337be482d73c98d0d42eaf36af4729
Author: baomingyu <[email protected]>
AuthorDate: Mon Feb 19 10:04:40 2024 +0800
[INLONG-9568][Sort] Support rowdata way of sort InLong message kv format
(#9647)
Co-authored-by: Charles Zhang <[email protected]>
---
.../sort/formats/base/TableFormatConstants.java | 2 +-
.../format-inlongmsg-rowdata-kv/pom.xml | 90 ++++++
.../inlongmsgkv/InLongMsgKvDecodingFormat.java | 120 +++++++
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 352 +++++++++++++++++++++
.../inlongmsgkv/InLongMsgKvFormatFactory.java | 137 ++++++++
.../InLongMsgKvRowDataDeserializationSchema.java | 145 +++++++++
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 171 ++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../InLongMsgKvFormatDeserializerTest.java | 345 ++++++++++++++++++++
.../inlongmsgkv/InLongMsgKvFormatFactoryTest.java | 135 ++++++++
.../src/test/resources/log4j-test.properties | 27 ++
inlong-sort/sort-formats/format-rowdata/pom.xml | 1 +
12 files changed, 1540 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatConstants.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatConstants.java
index 42b367c2fb..a1ff573d1c 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatConstants.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatConstants.java
@@ -41,7 +41,7 @@ public class TableFormatConstants {
public static final String FORMAT_PROPERTY_VERSION =
"format.property-version";
public static final String FORMAT_FIELD_DELIMITER =
"format.field-delimiter";
public static final String FORMAT_TIME_FIELD_NAME =
"format.time-field-name";
- public static final String FORMAT_KV_ENTRY_DELIMITER = "entry-delimiter";
+ public static final String FORMAT_KV_ENTRY_DELIMITER =
"format.entry-delimiter";
public static final String FORMAT_ATTRIBUTE_FIELD_NAME =
"format.attribute-field-name";
public static final String FORMAT_IS_MIXED = "format.is-mixed";
public static final String FORMAT_DELETE_HEAD_DELIMITER =
"format.delete-head-delimiter";
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/pom.xml
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/pom.xml
new file mode 100644
index 0000000000..a80170d90e
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>format-rowdata</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-format-inlongmsg-rowdata-kv</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort Format-InLongMsg-RowData-KV</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-inlongmsg-rowdata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <!-- CSV table descriptor testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- CSV RowData (de)serialization schema testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- CSV rowData encoder/intputformat testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java
new file mode 100644
index 0000000000..0e063482ce
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDecodingFormat;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.KV_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ATTRIBUTE_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CHARSET;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_ERRORS;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.QUOTE_CHARACTER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIELD_NAME;
+
+/**
+ * InLongMsgKvDecodingFormat.
+ */
+public class InLongMsgKvDecodingFormat extends AbstractInLongMsgDecodingFormat
{
+
+ private final ReadableConfig formatOptions;
+
+ public InLongMsgKvDecodingFormat(ReadableConfig formatOptions) {
+ this.formatOptions = formatOptions;
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType dataType) {
+ InLongMsgKvRowDataDeserializationSchema.Builder builder =
+ new InLongMsgKvRowDataDeserializationSchema.Builder(
+
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)));
+ configureDeserializationSchema(formatOptions, builder);
+ return builder.build();
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ private void configureDeserializationSchema(
+ ReadableConfig formatOptions,
+ InLongMsgKvRowDataDeserializationSchema.Builder schemaBuilder) {
+
schemaBuilder.setCharset(formatOptions.getOptional(CHARSET).orElse(CHARSET.defaultValue()));
+
+ formatOptions
+ .getOptional(TIME_FIELD_NAME)
+ .ifPresent(schemaBuilder::setTimeFieldName);
+
+ formatOptions
+ .getOptional(ATTRIBUTE_FIELD_NAME)
+ .ifPresent(schemaBuilder::setAttributesFieldName);
+
+ formatOptions
+ .getOptional(RETAIN_PREDEFINED_FIELD)
+ .ifPresent(schemaBuilder::setRetainPredefinedField);
+
+ formatOptions
+ .getOptional(KV_ENTRY_DELIMITER)
+ .map(delimiter ->
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setEntryDelimiter);
+
+ formatOptions
+ .getOptional(KV_DELIMITER)
+ .map(delimiter ->
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setKvDelimiter);
+
+ formatOptions
+ .getOptional(LINE_DELIMITER)
+ .map(delimiter ->
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setLineDelimiter);
+
+ formatOptions
+ .getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(schemaBuilder::setQuoteCharacter);
+
+ formatOptions
+ .getOptional(ESCAPE_CHARACTER)
+ .map(escape -> escape.charAt(0))
+ .ifPresent(schemaBuilder::setEscapeCharacter);
+
+ formatOptions
+ .getOptional(NULL_LITERAL)
+ .ifPresent(schemaBuilder::setNullLiteral);
+
+ formatOptions
+ .getOptional(IGNORE_ERRORS)
+ .ifPresent(schemaBuilder::setIgnoreErrors);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
new file mode 100644
index 0000000000..a125a2786d
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.base.TextFormatBuilder;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+
+/**
+ * The deserializer for the records in InLongMsgKv format.
+ */
+public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormatDeserializer {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Format information describing the result type.
+ */
+ @Nonnull
+ private final RowFormatInfo rowFormatInfo;
+
+ /**
+ * The name of the time field.
+ */
+ @Nullable
+ private final String timeFieldName;
+
+ /**
+ * The name of the attributes field.
+ */
+ @Nullable
+ private final String attributesFieldName;
+
+ /**
+ * The charset of the text.
+ */
+ @Nonnull
+ private final String charset;
+
+ /**
+ * The delimiter between entries.
+ */
+ @Nonnull
+ private final Character entryDelimiter;
+
+ /**
+ * The delimiter between key and value.
+ */
+ @Nonnull
+ private final Character kvDelimiter;
+
+ /**
+ * The delimiter between lines.
+ */
+ @Nullable
+ private final Character lineDelimiter;
+
+ /**
+ * Escape character. Null if escaping is disabled.
+ */
+ @Nullable
+ private final Character escapeChar;
+
+ /**
+ * Quote character. Null if quoting is disabled.
+ */
+ @Nullable
+ private final Character quoteChar;
+
+ /**
+ * The literal represented null values, default "".
+ */
+ @Nullable
+ private final String nullLiteral;
+
+ private final FieldToRowDataConverter[] converters;
+
+ /**
+ * True if the predefinedField existed, default true.
+ */
+ private boolean retainPredefinedField = true;
+
+ public InLongMsgKvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nonnull Boolean ignoreErrors,
+ @Nullable Boolean retainPredefinedField) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ if (retainPredefinedField != null) {
+ this.retainPredefinedField = retainPredefinedField;
+ }
+ }
+
+ public InLongMsgKvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nonnull Boolean ignoreErrors) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ }
+
+ public InLongMsgKvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nonnull FailureHandler failureHandler) {
+ super(failureHandler);
+
+ this.rowFormatInfo = rowFormatInfo;
+ this.timeFieldName = timeFieldName;
+ this.attributesFieldName = attributesFieldName;
+ this.charset = charset;
+ this.entryDelimiter = entryDelimiter;
+ this.kvDelimiter = kvDelimiter;
+ this.lineDelimiter = lineDelimiter;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+
+ converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+ .map(formatInfo -> FieldToRowDataConverters.createConverter(
+ TableFormatUtils.deriveLogicalType(formatInfo)))
+
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+
+ return InLongMsgUtils.decorateRowTypeWithNeededHeadFieldsAndMetadata(
+ timeFieldName,
+ attributesFieldName,
+ rowFormatInfo,
+ Collections.emptyList());
+ }
+
+ @Override
+ protected InLongMsgHead parseHead(String attr) {
+ return InLongMsgKvUtils.parseHead(attr);
+ }
+
+ @Override
+ protected List<InLongMsgBody> parseBodyList(byte[] bytes) {
+ return InLongMsgKvUtils.parseBodyList(
+ bytes,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar);
+ }
+
+ @Override
+ protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) {
+ GenericRowData genericRowData = InLongMsgKvUtils.deserializeRowData(
+ rowFormatInfo,
+ nullLiteral,
+ retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
+ body.getEntries(),
+ converters);
+
+ // Decorate result with time and attributes fields if needed
+ return
Collections.singletonList(InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ genericRowData));
+ }
+
+ /**
+ * The builder for {@link InLongMsgKvFormatDeserializer}.
+ */
+ public static class Builder extends TextFormatBuilder<Builder> {
+
+ private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+ private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+ private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+ private Character kvDelimiter = DEFAULT_KV_DELIMITER;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private Boolean retainPredefinedField;
+
+ public Builder(RowFormatInfo rowFormatInfo) {
+ super(rowFormatInfo);
+ }
+
+ public Builder setTimeFieldName(String timeFieldName) {
+ this.timeFieldName = timeFieldName;
+ return this;
+ }
+
+ public Builder setAttributesFieldName(String attributesFieldName) {
+ this.attributesFieldName = attributesFieldName;
+ return this;
+ }
+
+ public Builder setEntryDelimiter(Character entryDelimiter) {
+ this.entryDelimiter = entryDelimiter;
+ return this;
+ }
+
+ public Builder setKvDelimiter(Character kvDelimiter) {
+ this.kvDelimiter = kvDelimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setRetainPredefinedField(Boolean retainPredefinedField)
{
+ this.retainPredefinedField = retainPredefinedField;
+ return this;
+ }
+
+ public InLongMsgKvFormatDeserializer build() {
+ return new InLongMsgKvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ ignoreErrors,
+ retainPredefinedField);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ InLongMsgKvFormatDeserializer that = (InLongMsgKvFormatDeserializer) o;
+ return rowFormatInfo.equals(that.rowFormatInfo) &&
+ Objects.equals(timeFieldName, that.timeFieldName) &&
+ Objects.equals(attributesFieldName, that.attributesFieldName)
&&
+ charset.equals(that.charset) &&
+ entryDelimiter.equals(that.entryDelimiter) &&
+ kvDelimiter.equals(that.kvDelimiter) &&
+ Objects.equals(lineDelimiter, that.lineDelimiter) &&
+ Objects.equals(escapeChar, that.escapeChar) &&
+ Objects.equals(quoteChar, that.quoteChar) &&
+ Objects.equals(nullLiteral, that.nullLiteral) &&
+ Objects.equals(retainPredefinedField,
that.retainPredefinedField);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
+ attributesFieldName, charset, entryDelimiter, kvDelimiter,
lineDelimiter,
+ escapeChar, quoteChar, nullLiteral, retainPredefinedField);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
new file mode 100644
index 0000000000..3b2007a946
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TextFormatOptions.KV_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ATTRIBUTE_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CHARSET;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_ERRORS;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.QUOTE_CHARACTER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIELD_NAME;
+
+/**
+ * Table format factory for providing configured instances of
InLongMsgKv-to-rowData
+ * serializer and deserializer.
+ */
+public final class InLongMsgKvFormatFactory implements
DeserializationFormatFactory {
+
+ public static final String IDENTIFIER = "inlong-msg-kv";
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions)
{
+
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+
+ return new InLongMsgKvDecodingFormat(formatOptions);
+ }
+
+ // ------------------------------------------------------------------------
+ // Validation
+ // ------------------------------------------------------------------------
+
+ static void validateFormatOptions(ReadableConfig tableOptions) {
+ // Validate the option value must be a single char.
+ validateCharacterVal(tableOptions, KV_ENTRY_DELIMITER, true);
+ validateCharacterVal(tableOptions, KV_DELIMITER, true);
+ validateCharacterVal(tableOptions, LINE_DELIMITER, true);
+ validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+ validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option) {
+ validateCharacterVal(tableOptions, option, false);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ *
+ * @param tableOptions the table options
+ * @param option the config option
+ * @param unescape whether to unescape the option value
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option, boolean
unescape) {
+ if (tableOptions.getOptional(option).isPresent()) {
+ final String value =
+ unescape
+ ?
StringEscapeUtils.unescapeJava(tableOptions.get(option))
+ : tableOptions.get(option);
+ if (value.length() != 1) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s.%s' must be a string with single
character, but was: %s",
+ IDENTIFIER, option.key(),
tableOptions.get(option)));
+ }
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Stream.of(ROW_FORMAT_INFO).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(TIME_FIELD_NAME);
+ options.add(ATTRIBUTE_FIELD_NAME);
+ options.add(CHARSET);
+ options.add(KV_ENTRY_DELIMITER);
+ options.add(KV_DELIMITER);
+ options.add(LINE_DELIMITER);
+ options.add(QUOTE_CHARACTER);
+ options.add(ESCAPE_CHARACTER);
+ options.add(NULL_LITERAL);
+ options.add(IGNORE_ERRORS);
+ options.add(RETAIN_PREDEFINED_FIELD);
+ return options;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java
new file mode 100644
index 0000000000..caa27078c0
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+
+/**
+ * Deserialization schema from InLongMsg-KV to Flink Table & SQL internal data
structures.
+ */
+public class InLongMsgKvRowDataDeserializationSchema extends
AbstractInLongMsgDeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ public
InLongMsgKvRowDataDeserializationSchema(AbstractInLongMsgFormatDeserializer
formatDeserializer) {
+ super(formatDeserializer);
+ }
+
+ /**
+ * A builder for creating a {@link
InLongMsgKvRowDataDeserializationSchema}.
+ */
+ @PublicEvolving
+ public static class Builder {
+
+ private final RowFormatInfo rowFormatInfo;
+
+ private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+ private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+ private String charset = DEFAULT_CHARSET;
+ private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+ private Character kvDelimiter = DEFAULT_KV_DELIMITER;
+ private Character escapeChar = DEFAULT_ESCAPE_CHARACTER;
+ private Character quoteChar = DEFAULT_QUOTE_CHARACTER;
+ private String nullLiteral = DEFAULT_NULL_LITERAL;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private boolean retainPredefinedField =
DEFAULT_RETAIN_PREDEFINED_FIELD;
+ private boolean ignoreErrors = false;
+
+ protected Builder(RowFormatInfo rowFormatInfo) {
+ this.rowFormatInfo = rowFormatInfo;
+ }
+
+ public Builder setTimeFieldName(String timeFieldName) {
+ this.timeFieldName = timeFieldName;
+ return this;
+ }
+
+ public Builder setAttributesFieldName(String attributesFieldName) {
+ this.attributesFieldName = attributesFieldName;
+ return this;
+ }
+
+ public Builder setEntryDelimiter(Character entryDelimiter) {
+ this.entryDelimiter = entryDelimiter;
+ return this;
+ }
+
+ public Builder setKvDelimiter(Character kvDelimiter) {
+ this.kvDelimiter = kvDelimiter;
+ return this;
+ }
+
+ public Builder setCharset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public Builder setEscapeCharacter(Character escapeChar) {
+ this.escapeChar = escapeChar;
+ return this;
+ }
+
+ public Builder setQuoteCharacter(Character quoteChar) {
+ this.quoteChar = quoteChar;
+ return this;
+ }
+
+ public Builder setNullLiteral(String nullLiteral) {
+ this.nullLiteral = nullLiteral;
+ return this;
+ }
+
+ public Builder setRetainPredefinedField(Boolean retainPredefinedField)
{
+ this.retainPredefinedField = retainPredefinedField;
+ return this;
+ }
+
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setIgnoreErrors(Boolean ignoreErrors) {
+ this.ignoreErrors = ignoreErrors;
+ return this;
+ }
+
+ public InLongMsgKvRowDataDeserializationSchema build() {
+ AbstractInLongMsgFormatDeserializer formatDeserializer = new
InLongMsgKvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ ignoreErrors,
+ retainPredefinedField);
+
+ return new
InLongMsgKvRowDataDeserializationSchema(formatDeserializer);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
new file mode 100644
index 0000000000..c6f1eaf771
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.table.data.GenericRowData;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAMID;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
+
+/**
+ * Utilities for InLongMsgKv.
+ */
+public class InLongMsgKvUtils {
+
+ public static InLongMsgHead parseHead(String attr) {
+ Map<String, String> attributes = parseAttr(attr);
+
+ String streamId;
+ if (attributes.containsKey(INLONGMSG_ATTR_STREAMID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_STREAMID);
+ }
+ if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_NAME)) {
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_TID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_STREAMID
+ + " or " + INLONGMSG_ATTR_INTERFACE_NAME
+ + " or " + INLONGMSG_ATTR_INTERFACE_TID + " in
attributes!");
+ }
+
+ Timestamp time;
+ if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+ time = parseEpochTime(epoch);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+ time = parseDateTime(date);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_TIME_T +
+ " or " + INLONGMSG_ATTR_TIME_DT + " in
attributes!");
+ }
+
+ List<String> predefinedFields = getPredefinedFields(attributes);
+
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
+ }
+
+ public static List<InLongMsgBody> parseBodyList(
+ byte[] bytes,
+ String charset,
+ char entryDelimiter,
+ char kvDelimiter,
+ Character lineDelimiter,
+ Character escapeChar,
+ Character quoteChar) {
+ String text = new String(bytes, Charset.forName(charset));
+
+ List<Map<String, String>> list =
+ splitKv(
+ text,
+ entryDelimiter,
+ kvDelimiter,
+ escapeChar,
+ quoteChar,
+ lineDelimiter);
+
+ return list.stream().map((line) -> new InLongMsgBody(
+ bytes,
+ null,
+ Collections.emptyList(),
+ line)).collect(Collectors.toList());
+ }
+
+ /**
+ * Deserializes the row from the given entries.
+ *
+ * @param rowFormatInfo The format of the fields.
+ * @param nullLiteral The literal for null values.
+ * @param predefinedFields The predefined fields.
+ * @param entries The entries.
+ * @return The row deserialized from the given entries.
+ */
+ public static GenericRowData deserializeRowData(
+ RowFormatInfo rowFormatInfo,
+ String nullLiteral,
+ List<String> predefinedFields,
+ Map<String, String> entries,
+ FieldToRowDataConverter[] converters) {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+ GenericRowData row = new GenericRowData(fieldNames.length);
+
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ FieldToRowDataConverter converter = converters[i];
+ String fieldText = predefinedFields.get(i);
+
+ Object field = converter.convert(
+ deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral));
+ row.setField(i, field);
+ }
+
+ for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ FieldToRowDataConverter converter = converters[i];
+ String fieldText = entries.get(fieldName);
+
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral));
+ row.setField(i, field);
+ }
+
+ return row;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..4b312acf4a
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvFormatFactory
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
new file mode 100644
index 0000000000..3332bd21f0
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link InLongMsgKvFormatDeserializer}.
+ */
+public class InLongMsgKvFormatDeserializerTest {
+
+ private final FieldToRowDataConverters.FieldToRowDataConverter mapConvert =
+ FieldToRowDataConverters.createConverter(MAP(STRING(),
STRING()).getLogicalType());
+
+ private static final RowFormatInfo TEST_ROW_INFO =
+ new RowFormatInfo(
+ new String[]{"pf1", "pf2", "f1", "f2", "f3", "f4"},
+ new FormatInfo[]{
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+
+ @Test
+ public void testExceptionHandler() throws Exception {
+ TestFailureHandler errorHandler = new TestFailureHandler();
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer(
+ TEST_ROW_INFO,
+ "inlongmsg_time",
+ "inlongmsg_attributes",
+ DEFAULT_CHARSET,
+ DEFAULT_ENTRY_DELIMITER,
+ DEFAULT_KV_DELIMITER,
+ null,
+ null,
+ null,
+ null,
+ errorHandler);
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12&f4=field13";
+ String body2 = "f1=errormsg&f2=field21&f3=field22&f4=field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ List<RowData> actualRows = new ArrayList<>();
+ Collector<RowData> collector = new ListCollector<>(actualRows);
+ deserializer.flatMap(inLongMsg.buildArray(), collector);
+ assertEquals(1, errorHandler.getRowCount());
+
+ InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+ String abNormalAttrs =
"m=0&iname=testInterfaceId&__addcol1__=1&__addcol2__=2";
+ inLongMsg1.addMsg(abNormalAttrs, body1.getBytes());
+ inLongMsg1.addMsg(abNormalAttrs, body2.getBytes());
+ deserializer.flatMap(inLongMsg1.buildArray(), collector);
+ assertEquals(1, errorHandler.getHeadCount());
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12&f4=field13";
+ String body2 = "f1=123&f2=field21&f3=field22&f4=field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString("field13"));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString("field22"));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testNullField() throws Exception {
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12";
+ String body2 = "f1=123&f2=field21&f4=field23&f5=field24&=";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, null);
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, null);
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testNullField1() throws Exception {
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12";
+ String body2 = "f1=123&f2=field21&f4=field23&f5=field24&=";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, null);
+ expectRowData1.setField(3, null);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, null);
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, null);
+ expectRowData2.setField(3, null);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, null);
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setRetainPredefinedField(false)
+ .build();
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testLineDelimiter() throws Exception {
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&iname=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body = "f1=123&f2=field11\nf1=1&f2=2";
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("iname", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, null);
+ expectRowData1.setField(7, null);
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 1);
+ expectRowData2.setField(5, StringData.fromString("2"));
+ expectRowData2.setField(6, null);
+ expectRowData2.setField(7, null);
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setLineDelimiter('\n')
+ .build();
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ private void testRowDeserialization(
+ InLongMsgKvFormatDeserializer deserializer,
+ byte[] bytes,
+ List<RowData> expectedRows) throws Exception {
+ List<RowData> actualRows = new ArrayList<>();
+ Collector<RowData> collector = new ListCollector<>(actualRows);
+ deserializer.flatMap(bytes, collector);
+ assertEquals(expectedRows, actualRows);
+ }
+
+ private static class TestFailureHandler implements FailureHandler {
+
+ private int headCount = 0;
+ private int bodyCount = 0;
+ private int rowCount = 0;
+
+ public int getHeadCount() {
+ return headCount;
+ }
+
+ public int getBodyCount() {
+ return bodyCount;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public void onParsingHeadFailure(String attribute, Exception
exception) throws Exception {
+ headCount++;
+ }
+
+ @Override
+ public void onParsingBodyFailure(byte[] body, Exception exception)
throws Exception {
+ bodyCount++;
+ }
+
+ @Override
+ public void onConvertingRowFailure(InLongMsgHead head,
+ InLongMsgBody body, Exception exception) throws Exception {
+ rowCount++;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
new file mode 100644
index 0000000000..2e66ee93e8
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.FormatUtils;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link InLongMsgKvFormatFactory}.
+ */
+public class InLongMsgKvFormatFactoryTest {
+
+ public RowFormatInfo testFormatInfo;
+
+ public TypeInformation<RowData> testTypeInformation;
+
+ public ResolvedSchema resolvedSchema;
+
+ public DataType dataType;
+
+ @Before
+ public void setup() {
+ resolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("time", DataTypes.TIMESTAMP()),
+ Column.physical("attributes",
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ Column.physical("student_name", DataTypes.STRING()),
+ Column.physical("score", DataTypes.INT()),
+ Column.physical("date", DataTypes.DATE()));
+ dataType = resolvedSchema.toPhysicalRowDataType();
+ RowType rowType = (RowType) dataType.getLogicalType();
+ testTypeInformation = InternalTypeInfo.of(rowType);
+ testFormatInfo = new RowFormatInfo(
+ new String[]{"time", "attributes", "student_name", "score",
"date"},
+ new FormatInfo[]{
+ new TimestampFormatInfo(),
+ new MapFormatInfo(StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE),
+ StringFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ new DateFormatInfo("yyyy-MM-dd")
+ });
+ }
+
+ @Test
+ public void testCreateTableFormatDeserializer() {
+ final InLongMsgKvRowDataDeserializationSchema expectedDeSer =
+ new InLongMsgKvRowDataDeserializationSchema.Builder(
+ testFormatInfo)
+ .setCharset(DEFAULT_CHARSET)
+ .setEntryDelimiter('&')
+ .setKvDelimiter('=')
+ .setLineDelimiter(null)
+ .setQuoteCharacter('\"')
+ .setEscapeCharacter('\\')
+ .setNullLiteral("null")
+ .setIgnoreErrors(false)
+ .build();
+
+ final Map<String, String> options = getAllOptions();
+ DeserializationSchema<RowData> actualDeSer =
createDeserializationSchema(options);
+
+ assertEquals(expectedDeSer, actualDeSer);
+ }
+
+ private DeserializationSchema<RowData> createDeserializationSchema(
+ Map<String, String> options) {
+ final DynamicTableSource actualSource =
createTableSource(resolvedSchema, options);
+ assert actualSource instanceof
TestDynamicTableFactory.DynamicTableSourceMock;
+ TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ return sourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, dataType);
+ }
+
+ private Map<String, String> getAllOptions() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+
+ options.put("format", InLongMsgKvFormatFactory.IDENTIFIER);
+ options.put("inlong-msg-kv.row.format.info",
FormatUtils.marshall(testFormatInfo));
+ options.put("inlong-msg-kv.format.entry-delimiter", "&");
+ options.put("inlong-msg-kv.format.kv-delimiter", "=");
+ options.put("inlong-msg-kv.format.escape-character", "\\");
+ options.put("inlong-msg-kv.format.quote-character", "\"");
+ options.put("inlong-msg-kv.format.null-literal", "null");
+ options.put("inlong-msg-kv.format.ignore-errors", "false");
+ return options;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/resources/log4j-test.properties
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..e07d62aefe
--- /dev/null
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=[%t] %-5p %l %x - %m%n
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml
b/inlong-sort/sort-formats/format-rowdata/pom.xml
index 27528c0dc2..9ebf52fa4e 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -39,6 +39,7 @@
<module>format-inlongmsg-rowdata-base</module>
<module>format-inlongmsg-rowdata-binlog</module>
<module>format-inlongmsg-rowdata-pb</module>
+ <module>format-inlongmsg-rowdata-kv</module>
<module>format-inlongmsg-rowdata-csv</module>
</modules>