This is an automated email from the ASF dual-hosted git repository.
baomingyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a1523ec722 [INLONG-9521][Sort] Sort format supports InLongMsg-kv
format (#9536)
a1523ec722 is described below
commit a1523ec722e905c3c8a8a39fe884f684da5ba0dd
Author: baomingyu <[email protected]>
AuthorDate: Thu Dec 28 15:10:13 2023 +0800
[INLONG-9521][Sort] Sort format supports InLongMsg-kv format (#9536)
---
inlong-sort/sort-dist/pom.xml | 5 +
.../sort-formats/format-inlongmsg-kv/pom.xml | 129 ++++++++
.../sort/formats/inlongmsgkv/InLongMsgKv.java | 105 ++++++
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 364 +++++++++++++++++++++
.../inlongmsgkv/InLongMsgKvFormatFactory.java | 163 +++++++++
.../InLongMsgKvMixedFormatConverter.java | 160 +++++++++
.../InLongMsgKvMixedFormatDeserializer.java | 239 ++++++++++++++
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 169 ++++++++++
.../formats/inlongmsgkv/InLongMsgKvValidator.java | 44 +++
.../org.apache.flink.table.factories.TableFactory | 16 +
.../InLongMsgKvFormatDeserializerTest.java | 333 +++++++++++++++++++
.../inlongmsgkv/InLongMsgKvFormatFactoryTest.java | 123 +++++++
.../sort/formats/inlongmsgkv/InLongMsgKvTest.java | 112 +++++++
inlong-sort/sort-formats/pom.xml | 1 +
14 files changed, 1963 insertions(+)
diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml
index e7a387b4d6..2fcba9b316 100644
--- a/inlong-sort/sort-dist/pom.xml
+++ b/inlong-sort/sort-dist/pom.xml
@@ -75,6 +75,11 @@
<artifactId>sort-format-inlongmsg-csv</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-inlongmsg-kv</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-kv</artifactId>
diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/pom.xml
b/inlong-sort/sort-formats/format-inlongmsg-kv/pom.xml
new file mode 100644
index 0000000000..4c105fc19b
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-kv/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-formats</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-format-inlongmsg-kv</artifactId>
+ <name>Apache InLong - Sort Format-inlongmsg-kv</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <!-- core dependencies -->
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-inlongmsg-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ <version>${flink.jackson.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <!-- Create SQL Client uber jars by default -->
+ <profile>
+ <id>sql-jars</id>
+ <activation>
+ <property>
+ <name>!skipSqlJars</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>sql-jar</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
new file mode 100644
index 0000000000..52eb340ae1
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+
+/**
+ * Format descriptor for KVs.
+ */
+public class InLongMsgKv extends TextFormatDescriptor<InLongMsgKv> {
+
+ public static final String FORMAT_TYPE_VALUE = "InLongMsg-KV";
+
+ public InLongMsgKv() {
+ super(FORMAT_TYPE_VALUE, 1);
+ }
+
+ /**
+ * Sets the entry delimiter character ('&' by default).
+ *
+ * @param delimiter the entry delimiter character
+ * @return The instance which has delimiter set.
+ */
+ public InLongMsgKv entryDelimiter(char delimiter) {
+ internalProperties.putCharacter(FORMAT_ENTRY_DELIMITER, delimiter);
+ return this;
+ }
+
+ /**
+ * Sets the kv delimiter character ('=' by default).
+ *
+ * @param delimiter the kv delimiter character.
+ * @return The instance which has delimiter set.
+ */
+ public InLongMsgKv kvDelimiter(char delimiter) {
+ internalProperties.putCharacter(FORMAT_KV_DELIMITER, delimiter);
+ return this;
+ }
+
+ /**
+ * Sets the delimiter between lines.
+ *
+ * @param lineDelimiter The delimiter between lines (e.g. '\n').
+ * @return The instance which has lineDelimiter set.
+ */
+ public InLongMsgKv lineDelimiter(char lineDelimiter) {
+ internalProperties.putCharacter(FORMAT_LINE_DELIMITER, lineDelimiter);
+ return this;
+ }
+
+ /**
+ * Sets the name of the time field.
+ *
+ * @param timeFieldName The name of the time field.
+ * @return The instance which has timeFieldName set.
+ */
+ public InLongMsgKv timeFieldName(String timeFieldName) {
+ checkNotNull(timeFieldName);
+ internalProperties.putString(FORMAT_TIME_FIELD_NAME, timeFieldName);
+ return this;
+ }
+
+ /**
+ * Sets the name of the attributes field.
+ *
+ * @param attributesFieldName The name of the attributes field.
+ * @return The instance which has attributes field set.
+ */
+ public InLongMsgKv attributesFieldName(String attributesFieldName) {
+ checkNotNull(attributesFieldName);
+ internalProperties.putString(FORMAT_ATTRIBUTES_FIELD_NAME,
attributesFieldName);
+ return this;
+ }
+
+ /**
+ * Skips the predefined Field.
+ */
+ public InLongMsgKv skipPredefinedField() {
+ internalProperties.putBoolean(FORMAT_RETAIN_PREDEFINED_FIELD, false);
+ return this;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
new file mode 100644
index 0000000000..c7c16dcf30
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatBuilder;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
+
+/**
+ * The deserializer for the records in InLongMsgKv format.
+ */
+public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormatDeserializer {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Format information describing the result type.
+ */
+ @Nonnull
+ private final RowFormatInfo rowFormatInfo;
+
+ /**
+ * The name of the time field.
+ */
+ @Nullable
+ private final String timeFieldName;
+
+ /**
+ * The name of the attributes field.
+ */
+ @Nullable
+ private final String attributesFieldName;
+
+ /**
+ * The charset of the text.
+ */
+ @Nonnull
+ private final String charset;
+
+ /**
+ * The delimiter between entries.
+ */
+ @Nonnull
+ private final Character entryDelimiter;
+
+ /**
+ * The delimiter between key and value.
+ */
+ @Nonnull
+ private final Character kvDelimiter;
+
+ /**
+ * The delimiter between lines.
+ */
+ @Nullable
+ private final Character lineDelimiter;
+
+ /**
+ * Escape character. Null if escaping is disabled.
+ */
+ @Nullable
+ private final Character escapeChar;
+
+ /**
+ * Quote character. Null if quoting is disabled.
+ */
+ @Nullable
+ private final Character quoteChar;
+
+ /**
+ * The literal represented null values, default "".
+ */
+ @Nullable
+ private final String nullLiteral;
+
+ /**
+ * True if the predefinedField existed, default true.
+ */
+ private boolean retainPredefinedField = true;
+
+ public InLongMsgKvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nonnull Boolean ignoreErrors,
+ boolean retainPredefinedField) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ this.retainPredefinedField = retainPredefinedField;
+ }
+
+ public InLongMsgKvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nonnull Boolean ignoreErrors) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ }
+
+ public InLongMsgKvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nonnull FailureHandler failureHandler) {
+ super(failureHandler);
+
+ this.rowFormatInfo = rowFormatInfo;
+ this.timeFieldName = timeFieldName;
+ this.attributesFieldName = attributesFieldName;
+ this.charset = charset;
+ this.entryDelimiter = entryDelimiter;
+ this.kvDelimiter = kvDelimiter;
+ this.lineDelimiter = lineDelimiter;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return
InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName,
attributesFieldName, rowFormatInfo);
+ }
+
+ @Override
+ protected InLongMsgHead parseHead(String attr) {
+ return InLongMsgKvUtils.parseHead(attr);
+ }
+
+ @Override
+ protected List<InLongMsgBody> parseBodyList(byte[] bytes) {
+ return InLongMsgKvUtils.parseBodyList(
+ bytes,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar);
+ }
+
+ @Override
+ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
+ Row dataRow =
+ InLongMsgKvUtils.deserializeRow(
+ rowFormatInfo,
+ nullLiteral,
+ retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
+ body.getEntries());
+
+ Row row = InLongMsgUtils.decorateRowWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ dataRow);
+
+ return Collections.singletonList(row);
+ }
+
+ /**
+ * The builder for {@link InLongMsgKvFormatDeserializer}.
+ */
+ public static class Builder extends TextFormatBuilder<Builder> {
+
+ private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+ private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+ private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+ private Character kvDelimiter = DEFAULT_KV_DELIMITER;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private Boolean retainPredefinedField = DEFAULT_PREDEFINED_FIELD;
+
+ public Builder(RowFormatInfo rowFormatInfo) {
+ super(rowFormatInfo);
+
+ this.charset = DEFAULT_INLONGMSGKV_CHARSET;
+ }
+
+ public Builder setTimeFieldName(String timeFieldName) {
+ this.timeFieldName = timeFieldName;
+ return this;
+ }
+
+ public Builder setAttributesFieldName(String attributesFieldName) {
+ this.attributesFieldName = attributesFieldName;
+ return this;
+ }
+
+ public Builder setEntryDelimiter(Character entryDelimiter) {
+ this.entryDelimiter = entryDelimiter;
+ return this;
+ }
+
+ public Builder setKvDelimiter(Character kvDelimiter) {
+ this.kvDelimiter = kvDelimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setRetainPredefinedField(Boolean retainPredefinedField)
{
+ this.retainPredefinedField = retainPredefinedField;
+ return this;
+ }
+
+ public Builder configure(DescriptorProperties descriptorProperties) {
+ super.configure(descriptorProperties);
+
+ descriptorProperties.getOptionalString(FORMAT_TIME_FIELD_NAME)
+ .ifPresent(this::setTimeFieldName);
+
descriptorProperties.getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME)
+ .ifPresent(this::setAttributesFieldName);
+ descriptorProperties.getOptionalCharacter(FORMAT_ENTRY_DELIMITER)
+ .ifPresent(this::setEntryDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
+ .ifPresent(this::setKvDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_LINE_DELIMITER)
+ .ifPresent(this::setLineDelimiter);
+
descriptorProperties.getOptionalBoolean(FORMAT_RETAIN_PREDEFINED_FIELD)
+ .ifPresent(this::setRetainPredefinedField);
+
+ return this;
+ }
+
+ public InLongMsgKvFormatDeserializer build() {
+ return new InLongMsgKvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ ignoreErrors,
+ retainPredefinedField);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ InLongMsgKvFormatDeserializer that = (InLongMsgKvFormatDeserializer) o;
+ return rowFormatInfo.equals(that.rowFormatInfo) &&
+ Objects.equals(timeFieldName, that.timeFieldName) &&
+ Objects.equals(attributesFieldName, that.attributesFieldName)
&&
+ charset.equals(that.charset) &&
+ entryDelimiter.equals(that.entryDelimiter) &&
+ kvDelimiter.equals(that.kvDelimiter) &&
+ Objects.equals(lineDelimiter, that.lineDelimiter) &&
+ Objects.equals(escapeChar, that.escapeChar) &&
+ Objects.equals(quoteChar, that.quoteChar) &&
+ Objects.equals(nullLiteral, that.nullLiteral) &&
+ Objects.equals(retainPredefinedField,
that.retainPredefinedField);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
+ attributesFieldName, charset, entryDelimiter, kvDelimiter,
lineDelimiter,
+ escapeChar, quoteChar, nullLiteral, retainPredefinedField);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
new file mode 100644
index 0000000000..b3a0b54c9d
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
+import
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
+import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFormatFactoryBase;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+
+/**
+ * Table format factory for providing configured instances of
InLongMsgKv-to-row
+ * serializer and deserializer.
+ */
+public final class InLongMsgKvFormatFactory
+ extends
+ TableFormatFactoryBase<Row>
+ implements
+ TableFormatDeserializerFactory,
+ InLongMsgMixedFormatFactory {
+
+ public InLongMsgKvFormatFactory() {
+ super(InLongMsgKv.FORMAT_TYPE_VALUE, 1, false);
+ }
+
+ @Override
+ public List<String> supportedFormatProperties() {
+ final List<String> properties = new ArrayList<>();
+ properties.add(FORMAT_CHARSET);
+ properties.add(FORMAT_ENTRY_DELIMITER);
+ properties.add(FORMAT_KV_DELIMITER);
+ properties.add(FORMAT_LINE_DELIMITER);
+ properties.add(FORMAT_ESCAPE_CHARACTER);
+ properties.add(FORMAT_QUOTE_CHARACTER);
+ properties.add(FORMAT_NULL_LITERAL);
+ properties.add(FORMAT_IGNORE_ERRORS);
+ properties.add(FORMAT_SCHEMA);
+ properties.add(FORMAT_TIME_FIELD_NAME);
+ properties.add(FORMAT_ATTRIBUTES_FIELD_NAME);
+ properties.add(FORMAT_RETAIN_PREDEFINED_FIELD);
+ return properties;
+ }
+
+ @Override
+ public InLongMsgKvFormatDeserializer createFormatDeserializer(
+ Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties =
+ new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ InLongMsgKvValidator validator = new InLongMsgKvValidator();
+ validator.validate(descriptorProperties);
+
+ RowFormatInfo rowFormatInfo =
getDataRowFormatInfo(descriptorProperties);
+
+ InLongMsgKvFormatDeserializer.Builder builder =
+ new InLongMsgKvFormatDeserializer.Builder(rowFormatInfo);
+ builder.configure(descriptorProperties);
+
+ return builder.build();
+ }
+
+ @Override
+ public TableFormatDeserializer createFormatDeserializer(TableFormatContext
context) {
+ TableFormatDeserializer deserializer =
+ createFormatDeserializer(context.getFormatProperties());
+ deserializer.init(context);
+ return deserializer;
+ }
+
+ @Override
+ public InLongMsgKvMixedFormatDeserializer createMixedFormatDeserializer(
+ Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties =
+ new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ InLongMsgMixedFormatDeserializerValidator validator =
+ new InLongMsgMixedFormatDeserializerValidator();
+ validator.validate(descriptorProperties);
+
+ InLongMsgKvMixedFormatDeserializer.Builder builder =
+ new InLongMsgKvMixedFormatDeserializer.Builder();
+ builder.configure(descriptorProperties);
+
+ return builder.build();
+ }
+
+ @Override
+ public AbstractInLongMsgMixedFormatConverter createMixedFormatConverter(
+ AbstractInLongMsgMixedFormatConverter.TableFormatContext context) {
+ return createMixedFormatConverter(context.getFormatProperties());
+ }
+
+ @Override
+ public AbstractInLongMsgMixedFormatDeserializer
createMixedFormatDeserializer(
+ TableFormatContext context) {
+ InLongMsgKvMixedFormatDeserializer deserializer =
+ createMixedFormatDeserializer(context.getFormatProperties());
+ deserializer.init(context);
+ return deserializer;
+ }
+
+ @Override
+ public InLongMsgKvMixedFormatConverter createMixedFormatConverter(
+ Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties =
+ new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ InLongMsgMixedFormatConverterValidator validator =
+ new InLongMsgMixedFormatConverterValidator();
+ validator.validate(descriptorProperties);
+
+ RowFormatInfo rowFormatInfo =
getDataRowFormatInfo(descriptorProperties);
+ InLongMsgKvMixedFormatConverter.Builder builder =
+ new InLongMsgKvMixedFormatConverter.Builder(rowFormatInfo);
+ builder.configure(descriptorProperties);
+
+ return builder.build();
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
new file mode 100644
index 0000000000..59195b87e1
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Converter used to deserialize a mixed row in InLongMsg-kv format.
+ */
+public class InLongMsgKvMixedFormatConverter extends
AbstractInLongMsgMixedFormatConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InLongMsgKvMixedFormatConverter.class);
+
+ /**
+ * The schema of the rows.
+ */
+ @Nonnull
+ private final RowFormatInfo rowFormatInfo;
+
+ /**
+ * The name of the time field.
+ */
+ @Nullable
+ private final String timeFieldName;
+
+ /**
+ * The name of the attributes field.
+ */
+ @Nullable
+ private final String attributesFieldName;
+
+ /**
+ * The literal representing null values.
+ */
+ private final String nullLiteral;
+
+ public InLongMsgKvMixedFormatConverter(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nullable String nullLiteral,
+ boolean ignoreErrors) {
+ super(ignoreErrors);
+
+ this.rowFormatInfo = rowFormatInfo;
+ this.timeFieldName = timeFieldName;
+ this.attributesFieldName = attributesFieldName;
+ this.nullLiteral = nullLiteral;
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return
InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName,
attributesFieldName, rowFormatInfo);
+ }
+
+ @Override
+ public List<Row> convertRows(
+ Map<String, String> attributes,
+ byte[] data,
+ String tid,
+ Timestamp time,
+ List<String> predefinedFields,
+ List<String> fields,
+ Map<String, String> entries) {
+ Row dataRow =
+ InLongMsgKvUtils.deserializeRow(
+ rowFormatInfo,
+ nullLiteral,
+ predefinedFields,
+ entries);
+
+ Row row = InLongMsgUtils.decorateRowWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ time,
+ attributes,
+ dataRow);
+
+ return Collections.singletonList(row);
+ }
+
+ /**
+ * The builder for {@link InLongMsgKvMixedFormatConverter}.
+ */
+ public static class Builder extends InLongMsgMixedFormatConverterBuilder {
+
+ public Builder(RowFormatInfo rowFormatInfo) {
+ super(rowFormatInfo);
+ }
+
+ public InLongMsgKvMixedFormatConverter build() {
+ return new InLongMsgKvMixedFormatConverter(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ nullLiteral,
+ ignoreErrors);
+ }
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ InLongMsgKvMixedFormatConverter that =
(InLongMsgKvMixedFormatConverter) object;
+ return rowFormatInfo.equals(that.rowFormatInfo) &&
+ Objects.equals(timeFieldName, that.timeFieldName) &&
+ Objects.equals(attributesFieldName, that.attributesFieldName)
&&
+ Objects.equals(nullLiteral, that.nullLiteral);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowFormatInfo, timeFieldName, attributesFieldName,
nullLiteral);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
new file mode 100644
index 0000000000..34e67780a3
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
+
+/**
+ * The deserializer for the records in InLongMsgKv format.
+ */
+public final class InLongMsgKvMixedFormatDeserializer
+ extends
+ AbstractInLongMsgMixedFormatDeserializer {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The charset of the text.
+ */
+ @Nonnull
+ private final String charset;
+
+ /**
+ * The delimiter between entries.
+ */
+ @Nonnull
+ private final Character entryDelimiter;
+
+ /**
+ * The delimiter between key and value.
+ */
+ @Nonnull
+ private final Character kvDelimiter;
+
+ /**
+ * The delimiter between lines.
+ */
+ @Nullable
+ private final Character lineDelimiter;
+
+ /**
+ * Escape character. Null if escaping is disabled.
+ */
+ @Nullable
+ private final Character escapeChar;
+
+ /**
+ * Quote character. Null if quoting is disabled.
+ */
+ @Nullable
+ private final Character quoteChar;
+
+ public InLongMsgKvMixedFormatDeserializer(
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nonnull Boolean ignoreErrors) {
+ this(
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ }
+
+ public InLongMsgKvMixedFormatDeserializer(
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nonnull FailureHandler failureHandler) {
+ super(failureHandler);
+
+ this.entryDelimiter = entryDelimiter;
+ this.kvDelimiter = kvDelimiter;
+ this.lineDelimiter = lineDelimiter;
+ this.charset = charset;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return InLongMsgUtils.MIXED_ROW_TYPE;
+ }
+
+ @Override
+ protected InLongMsgHead parseHead(String attr) {
+ return InLongMsgKvUtils.parseHead(attr);
+ }
+
+ @Override
+ protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception
{
+ return InLongMsgKvUtils.parseBodyList(
+ bytes,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar);
+ }
+
+ @Override
+ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
+ Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+ return Collections.singletonList(row);
+ }
+
+ /**
+ * The builder for {@link InLongMsgKvMixedFormatDeserializer}.
+ */
+ public static class Builder extends
InLongMsgTextMixedFormatDeserializerBuilder<Builder> {
+
+ private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+ private Character kvDelimiter = DEFAULT_KV_DELIMITER;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+
+ public Builder() {
+ super();
+
+ this.charset = DEFAULT_INLONGMSGKV_CHARSET;
+ }
+
+ public Builder setEntryDelimiter(Character entryDelimiter) {
+ this.entryDelimiter = entryDelimiter;
+ return this;
+ }
+
+ public Builder setKvDelimiter(Character kvDelimiter) {
+ this.kvDelimiter = kvDelimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ @Override
+ public Builder configure(DescriptorProperties descriptorProperties) {
+ super.configure(descriptorProperties);
+
+ descriptorProperties.getOptionalCharacter(FORMAT_ENTRY_DELIMITER)
+ .ifPresent(this::setEntryDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
+ .ifPresent(this::setKvDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_LINE_DELIMITER)
+ .ifPresent(this::setLineDelimiter);
+
+ return this;
+ }
+
+ public InLongMsgKvMixedFormatDeserializer build() {
+ return new InLongMsgKvMixedFormatDeserializer(
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ ignoreErrors);
+ }
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ InLongMsgKvMixedFormatDeserializer that =
(InLongMsgKvMixedFormatDeserializer) object;
+ return charset.equals(that.charset) &&
+ entryDelimiter.equals(that.entryDelimiter) &&
+ kvDelimiter.equals(that.kvDelimiter) &&
+ Objects.equals(lineDelimiter, that.lineDelimiter) &&
+ Objects.equals(escapeChar, that.escapeChar) &&
+ Objects.equals(quoteChar, that.quoteChar);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), charset, entryDelimiter,
kvDelimiter, lineDelimiter,
+ escapeChar, quoteChar);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
new file mode 100644
index 0000000000..9964429c2e
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.types.Row;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
+
+/**
+ * Utilities for {@link InLongMsgKv}.
+ */
+public class InLongMsgKvUtils {
+
+ public static final String DEFAULT_INLONGMSGKV_CHARSET = "ISO_8859_1";
+
+ public static InLongMsgHead parseHead(String attr) {
+ Map<String, String> attributes = parseAttr(attr);
+
+ String tid;
+ if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
+ tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
+ tid = attributes.get(INLONGMSG_ATTR_TID);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_STREAM_ID +
+ " or " + INLONGMSG_ATTR_TID + " in attributes!");
+ }
+
+ Timestamp time;
+ if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+ time = parseEpochTime(epoch);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+ time = parseDateTime(date);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_TIME_T +
+ " or " + INLONGMSG_ATTR_TIME_T + " in
attributes!");
+ }
+
+ List<String> predefinedFields = getPredefinedFields(attributes);
+
+ return new InLongMsgHead(attributes, tid, time, predefinedFields);
+ }
+
+ public static List<InLongMsgBody> parseBodyList(
+ byte[] bytes,
+ String charset,
+ char entryDelimiter,
+ char kvDelimiter,
+ Character lineDelimiter,
+ Character escapeChar,
+ Character quoteChar) {
+ String text = new String(bytes, Charset.forName(charset));
+
+ List<Map<String, String>> list =
+ splitKv(
+ text,
+ entryDelimiter,
+ kvDelimiter,
+ escapeChar,
+ quoteChar,
+ lineDelimiter);
+
+ return list.stream().map((line) -> {
+ return new InLongMsgBody(
+ bytes,
+ null,
+ Collections.emptyList(),
+ line);
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * Deserializes the row from the given entries.
+ *
+ * @param rowFormatInfo The format of the fields.
+ * @param nullLiteral The literal for null values.
+ * @param predefinedFields The predefined fields.
+ * @param entries The entries.
+ * @return The row deserialized from the given entries.
+ */
+ public static Row deserializeRow(
+ RowFormatInfo rowFormatInfo,
+ String nullLiteral,
+ List<String> predefinedFields,
+ Map<String, String> entries) {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+ Row row = new Row(fieldNames.length);
+
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+ String fieldText = predefinedFields.get(i);
+
+ Object field =
+ deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral);
+ row.setField(i, field);
+ }
+
+ for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+ String fieldText = entries.get(fieldName);
+
+ Object field =
+ deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral);
+ row.setField(i, field);
+ }
+
+ return row;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
new file mode 100644
index 0000000000..e290784c09
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+
+/**
+ * The validator for {@link InLongMsgKv}.
+ */
+public class InLongMsgKvValidator extends TextFormatDescriptorValidator {
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+
+ properties.validateString(FORMAT_ENTRY_DELIMITER, true, 1, 1);
+ properties.validateString(FORMAT_KV_DELIMITER, true, 1, 1);
+ properties.validateString(FORMAT_LINE_DELIMITER, true, 1, 1);
+
+ validateInLongMsgSchema(properties);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000..4b312acf4a
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvFormatFactory
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
new file mode 100644
index 0000000000..911486ed6a
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link InLongMsgKvFormatDeserializer}.
+ */
+public class InLongMsgKvFormatDeserializerTest {
+
+ private static final RowFormatInfo TEST_ROW_INFO =
+ new RowFormatInfo(
+ new String[]{"pf1", "pf2", "f1", "f2", "f3", "f4"},
+ new FormatInfo[]{
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+
+ @Test
+ public void testExceptionHandler() throws Exception {
+ TestFailureHandler errorHandler = new TestFailureHandler();
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer(
+ TEST_ROW_INFO,
+ "inlongmsg_time",
+ "inlongmsg_attributes",
+ DEFAULT_INLONGMSGKV_CHARSET,
+ DEFAULT_ENTRY_DELIMITER,
+ DEFAULT_KV_DELIMITER,
+ null,
+ null,
+ null,
+ null,
+ errorHandler);
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12&f4=field13";
+ String body2 = "f1=errormsg&f2=field21&f3=field22&f4=field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ List<Row> actualRows = new ArrayList<>();
+ Collector<Row> collector = new ListCollector<>(actualRows);
+ deserializer.flatMap(inLongMsg.buildArray(), collector);
+ assertEquals(1, errorHandler.getRowCount());
+
+ InLongMsg InLongMsgHead = InLongMsg.newInLongMsg();
+ String abNormalAttrs =
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
+ InLongMsgHead.addMsg(abNormalAttrs, body1.getBytes());
+ InLongMsgHead.addMsg(abNormalAttrs, body2.getBytes());
+ deserializer.flatMap(InLongMsgHead.buildArray(), collector);
+ assertEquals(1, errorHandler.getHeadCount());
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12&f4=field13";
+ String body2 = "f1=123&f2=field21&f3=field22&f4=field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ Row expectedRow1 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ 123,
+ "field11",
+ "field12",
+ "field13");
+
+ Row expectedRow2 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ 123,
+ "field21",
+ "field22",
+ "field23");
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+ testRowDeserialization(
+ deserializer, inLongMsg.buildArray(),
+ Arrays.asList(expectedRow1, expectedRow2));
+ }
+
+ @Test
+ public void testNullField() throws Exception {
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12";
+ String body2 = "f1=123&f2=field21&f4=field23&f5=field24&=";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ Row expectedRow1 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ 123,
+ "field11",
+ "field12",
+ null);
+
+ Row expectedRow2 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ 123,
+ "field21",
+ null,
+ "field23");
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+ testRowDeserialization(
+ deserializer, inLongMsg.buildArray(),
+ Arrays.asList(expectedRow1, expectedRow2));
+ }
+
+ @Test
+ public void testNullField1() throws Exception {
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "f1=123&f2=field11&f3=field12";
+ String body2 = "f1=123&f2=field21&f4=field23&f5=field24&=";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ Row expectedRow1 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ null,
+ null,
+ 123,
+ "field11",
+ "field12",
+ null);
+
+ Row expectedRow2 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ null,
+ null,
+ 123,
+ "field21",
+ null,
+ "field23");
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setRetainPredefinedField(false)
+ .build();
+ testRowDeserialization(
+ deserializer, inLongMsg.buildArray(),
+ Arrays.asList(expectedRow1, expectedRow2));
+ }
+
+ @Test
+ public void testLineDelimiter() throws Exception {
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs =
"m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body = "f1=123&f2=field11\nf1=1&f2=2";
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map<String, String> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ Row expectedRow1 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ 123,
+ "field11",
+ null,
+ null);
+
+ Row expectedRow2 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ 1,
+ "2",
+ null,
+ null);
+
+ InLongMsgKvFormatDeserializer deserializer =
+ new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inloingmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setLineDelimiter('\n')
+ .build();
+
+ testRowDeserialization(
+ deserializer, inLongMsg.buildArray(),
+ Arrays.asList(expectedRow1, expectedRow2));
+ }
+
+ private void testRowDeserialization(
+ InLongMsgKvFormatDeserializer deserializer,
+ byte[] bytes,
+ List<Row> expectedRows) throws Exception {
+ List<Row> actualRows = new ArrayList<>();
+ Collector<Row> collector = new ListCollector<>(actualRows);
+ deserializer.flatMap(bytes, collector);
+ assertEquals(expectedRows, actualRows);
+ }
+
+ private static class TestFailureHandler implements FailureHandler {
+
+ private int headCount = 0;
+ private int bodyCount = 0;
+ private int rowCount = 0;
+
+ public int getHeadCount() {
+ return headCount;
+ }
+
+ public int getBodyCount() {
+ return bodyCount;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public void onParsingHeadFailure(String attribute, Exception
exception) throws Exception {
+ headCount++;
+ }
+
+ @Override
+ public void onParsingBodyFailure(byte[] body, Exception exception)
throws Exception {
+ bodyCount++;
+ }
+
+ @Override
+ public void onConvertingRowFailure(InLongMsgHead head,
+ InLongMsgBody body,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
new file mode 100644
index 0000000000..54352acb3c
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for {@link InLongMsgKvFormatFactory}.
+ */
+public class InLongMsgKvFormatFactoryTest {
+
+ private static final TypeInformation<Row> SCHEMA =
+ Types.ROW(
+ new String[]{"time", "attributes", "student_name",
"score", "date"},
+ new TypeInformation[]{
+ Types.SQL_TIMESTAMP(),
+ Types.MAP(Types.STRING(), Types.STRING()),
+ Types.STRING(),
+ Types.INT(),
+ Types.SQL_DATE()
+ });
+
+ private static final RowFormatInfo TEST_FORMAT_SCHEMA =
+ new RowFormatInfo(
+ new String[]{"student_name", "score", "date"},
+ new FormatInfo[]{
+ StringFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ new DateFormatInfo("yyyy-MM-dd")
+ });
+
+ @Test
+ public void testCreateTableFormatDeserializer() throws Exception {
+ final Map<String, String> properties =
+ new InLongMsgKv()
+ .schema(TEST_FORMAT_SCHEMA)
+ .entryDelimiter('&')
+ .kvDelimiter('=')
+ .escapeCharacter('\\')
+ .quoteCharacter('\"')
+ .nullLiteral("null")
+ .toProperties();
+ assertNotNull(properties);
+
+ final InLongMsgKvFormatDeserializer expectedDeser =
+ new InLongMsgKvFormatDeserializer(
+ TEST_FORMAT_SCHEMA,
+ DEFAULT_TIME_FIELD_NAME,
+ DEFAULT_ATTRIBUTES_FIELD_NAME,
+ DEFAULT_INLONGMSGKV_CHARSET,
+ '&',
+ '=',
+ null,
+ '\\',
+ '\"',
+ "null",
+ false);
+
+ final TableFormatDeserializer actualDeser =
+ TableFormatUtils.getTableFormatDeserializer(
+ properties,
+ getClass().getClassLoader());
+
+ assertEquals(expectedDeser, actualDeser);
+ }
+
+ @Test(expected = Exception.class)
+ public void testCreateTableFormatDeserializerWithDerivation() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.putAll(
+ new Schema()
+ .schema(TableSchema.fromTypeInfo(SCHEMA))
+ .toProperties());
+ properties.putAll(new InLongMsgKv().deriveSchema().toProperties());
+
+ final InLongMsgKvFormatDeserializer expectedDeser =
+ new
InLongMsgKvFormatDeserializer.Builder(TEST_FORMAT_SCHEMA).build();
+
+ final TableFormatDeserializer actualDeser =
+ TableFormatUtils.getTableFormatDeserializer(
+ properties,
+ getClass().getClassLoader());
+
+ assertEquals(expectedDeser, actualDeser);
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
new file mode 100644
index 0000000000..8b84fa125c
--- /dev/null
+++
b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgkv;
+
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorTestBase;
+import org.apache.flink.table.descriptors.DescriptorValidator;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link InLongMsgKv} descriptor.
+ */
+public class InLongMsgKvTest extends DescriptorTestBase {
+
+ private static final String TEST_SCHEMA =
+ "{" +
+ "\"type\":\"row\"," +
+ "\"fieldFormats\":[{" +
+ "\"name\":\"student_name\"," +
+ "\"format\":{\"type\":\"string\"}" +
+ "},{" +
+ "\"name\":\"score\"," +
+ "\"format\":{\"type\":\"int\"}" +
+ "},{" +
+ "\"name\":\"date\"," +
+ "\"format\":{" +
+ "\"type\":\"date\"," +
+ "\"format\":\"yyyy-MM-dd\"" +
+ "}" +
+ "}]" +
+ "}";
+
+ private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA =
+ new InLongMsgKv()
+ .schema(TEST_SCHEMA)
+ .timeFieldName("time")
+ .attributesFieldName("attributes")
+ .entryDelimiter('&')
+ .kvDelimiter('=')
+ .lineDelimiter('\n')
+ .charset(StandardCharsets.UTF_8)
+ .escapeCharacter('\\')
+ .quoteCharacter('\"')
+ .nullLiteral("n/a")
+ .ignoreErrors();
+
+ @Test(expected = ValidationException.class)
+ public void testInvalidIgnoreParseErrors() {
+ addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA,
"format.escape-character", "DDD");
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testMissingSchema() {
+ removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA,
"format.schema");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public List<Descriptor> descriptors() {
+ return Collections.singletonList(CUSTOM_DESCRIPTOR_WITH_SCHEMA);
+ }
+
+ @Override
+ public List<Map<String, String>> properties() {
+ final Map<String, String> props1 = new HashMap<>();
+ props1.put("format.type", "InLongMsg-KV");
+ props1.put("format.property-version", "1");
+ props1.put("format.schema", TEST_SCHEMA);
+ props1.put("format.time-field-name", "time");
+ props1.put("format.attributes-field-name", "attributes");
+ props1.put("format.entry-delimiter", "&");
+ props1.put("format.kv-delimiter", "=");
+ props1.put("format.line-delimiter", "\n");
+ props1.put("format.charset", "UTF-8");
+ props1.put("format.escape-character", "\\");
+ props1.put("format.quote-character", "\"");
+ props1.put("format.null-literal", "n/a");
+ props1.put("format.ignore-errors", "true");
+
+ return Collections.singletonList(props1);
+ }
+
+ @Override
+ public DescriptorValidator validator() {
+ return new InLongMsgValidator();
+ }
+}
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 7e6f4927ea..3f668091db 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -37,6 +37,7 @@
<module>format-kv</module>
<module>format-inlongmsg-base</module>
<module>format-inlongmsg-csv</module>
+ <module>format-inlongmsg-kv</module>
<module>format-inlongmsg-pb</module>
</modules>