This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7148c1cfaa [INLONG-10784][Sort] Refactor the structure of
inlongmsg-rowdata (#10786)
7148c1cfaa is described below
commit 7148c1cfaa6208116a776694564bd0506825eeb0
Author: XiaoYou201 <[email protected]>
AuthorDate: Wed Aug 14 12:11:56 2024 +0800
[INLONG-10784][Sort] Refactor the structure of inlongmsg-rowdata (#10786)
* [INLONG-10784][Sort] Refactor the structure of inlongmsg-rowdata
* [INLONG-10784][Sort] fix format
* [INLONG-10784][Sort] fix format
* [INLONG-10784][Sort] fix format
* [INLONG-10784][Sort] Rename row module structure
* Revert "[INLONG-10784][Sort] Rename row module structure"
This reverts commit 9ee654784ca6e213b642a584e7e5d78572925366.
* Revert "[INLONG-10784][Sort] fix format"
This reverts commit 32dcc771c1c2a53f14602537e38f7922ca620d8c.
* Revert "[INLONG-10784][Sort] fix format"
This reverts commit dd45c0fc26d6f7073d6d214ef36dc0d95ac22903.
* Revert "[INLONG-10784][Sort] fix format"
This reverts commit 4d1e2b0e8b74effa096a7612d2aa954882a519a2.
* Revert "[INLONG-10784][Sort] Refactor the structure of inlongmsg-rowdata"
This reverts commit a69075dfbb77e28fb2e3b6b7b6e41cbf89eb06de.
* [INLONG-10784][Sort] Refactor the Row module structure
---
.../AbstractInLongMsgFormatDeserializer.java | 5 ++++-
.../AbstractInLongMsgMixedFormatConverter.java | 2 +-
.../AbstractInLongMsgMixedFormatDeserializer.java | 4 +++-
.../inlongmsg/{ => row}/InLongMsgDecodingFormat.java | 5 +++--
.../{ => row}/InLongMsgDeserializationSchema.java | 5 +++--
.../inlongmsg/{ => row}/InLongMsgFormatFactory.java | 2 +-
.../{ => row}/InLongMsgMixedFormatConverter.java | 2 +-
.../InLongMsgMixedFormatConverterBuilder.java | 10 +++++-----
.../InLongMsgMixedFormatConverterValidator.java | 4 ++--
.../InLongMsgMixedFormatDeserializerValidator.java | 2 +-
.../{ => row}/InLongMsgMixedFormatFactory.java | 2 +-
.../InLongMsgTextMixedFormatDeserializerBuilder.java | 2 +-
.../formats/inlongmsg/{ => row}/InLongMsgUtils.java | 7 ++++++-
.../org.apache.flink.table.factories.Factory | 2 +-
.../inlongmsg/InLongMsgFormatFactoryTest.java | 3 ++-
.../formats/inlongmsg/InLongMsgRowDataSerDeTest.java | 1 +
.../formats/inlongmsgbinlog/InLongMsgBinlog.java | 4 ++--
.../InLongMsgBinlogFormatBuilder.java | 8 ++++----
.../InLongMsgBinlogFormatDeserializer.java | 4 ++--
.../InLongMsgBinlogFormatFactory.java | 14 +++++++-------
.../InLongMsgBinlogMixedFormatConverter.java | 2 +-
.../InLongMsgBinlogMixedFormatDeserializer.java | 4 ++--
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 20 ++++++++++----------
.../inlongmsgbinlog/InLongMsgBinlogValidator.java | 4 ++--
.../InLongMsgBinlogFormatDeserializerTest.java | 4 ++--
.../sort/formats/inlongmsgcsv/InLongMsgCsv.java | 6 +++---
.../inlongmsgcsv/InLongMsgCsvFormatDeserializer.java | 16 ++++++++--------
.../inlongmsgcsv/InLongMsgCsvFormatFactory.java | 18 +++++++++---------
.../InLongMsgCsvMixedFormatConverter.java | 6 +++---
.../InLongMsgCsvMixedFormatDeserializer.java | 6 +++---
.../sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java | 18 +++++++++---------
.../formats/inlongmsgcsv/InLongMsgCsvValidator.java | 2 +-
.../InLongMsgCsvFormatDeserializerTest.java | 4 ++--
.../inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java | 2 +-
.../inlong/sort/formats/inlongmsgkv/InLongMsgKv.java | 6 +++---
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 16 ++++++++--------
.../inlongmsgkv/InLongMsgKvFormatFactory.java | 18 +++++++++---------
.../inlongmsgkv/InLongMsgKvMixedFormatConverter.java | 6 +++---
.../InLongMsgKvMixedFormatDeserializer.java | 6 +++---
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 18 +++++++++---------
.../formats/inlongmsgkv/InLongMsgKvValidator.java | 2 +-
.../inlongmsgkv/InLongMsgKvFormatFactoryTest.java | 4 ++--
.../formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java | 2 +-
.../InLongMsgTlogCsvFormatDeserializer.java | 12 ++++++------
.../InLongMsgTlogCsvFormatFactory.java | 16 ++++++++--------
.../InLongMsgTlogCsvMixedFormatConverter.java | 6 +++---
.../InLongMsgTlogCsvMixedFormatDeserializer.java | 6 +++---
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 12 ++++++------
.../inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java | 2 +-
.../InLongMsgTlogCsvFormatDeserializerTest.java | 4 ++--
.../InLongMsgTlogCsvFormatFactoryTest.java | 2 +-
.../formats/inlongmsgtlogkv/InLongMsgTlogKv.java | 2 +-
.../InLongMsgTlogKvFormatDeserializer.java | 12 ++++++------
.../InLongMsgTlogKvFormatFactory.java | 16 ++++++++--------
.../InLongMsgTlogKvMixedFormatConverter.java | 6 +++---
.../InLongMsgTlogKvMixedFormatDeserializer.java | 6 +++---
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 12 ++++++------
.../inlongmsgtlogkv/InLongMsgTlogKvValidator.java | 2 +-
.../InLongMsgTlogKvFormatDeserializerTest.java | 4 ++--
.../InLongMsgTlogKvFormatFactoryTest.java | 2 +-
60 files changed, 207 insertions(+), 193 deletions(-)
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgFormatDeserializer.java
similarity index 94%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgFormatDeserializer.java
index 79378468a5..4c264ffd4b 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgFormatDeserializer.java
@@ -15,10 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatConverter.java
similarity index 98%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatConverter.java
index 165f6532fb..96c91776ff 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatConverter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatDeserializer.java
similarity index 91%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatDeserializer.java
index d2d0b7ca3f..00ff3ecf3b 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatDeserializer.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
+
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import javax.annotation.Nonnull;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDecodingFormat.java
similarity index 98%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDecodingFormat.java
index 779ff6b8e8..f3471596c7 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDecodingFormat.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgDeserializationSchema.MetadataConverter;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDeserializationSchema.java
similarity index 97%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDeserializationSchema.java
index d3a27c62be..fd5d01f609 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDeserializationSchema.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
import com.google.common.base.Objects;
import org.apache.flink.api.common.functions.util.ListCollector;
@@ -152,7 +153,7 @@ public class InLongMsgDeserializationSchema implements
DeserializationSchema<Row
return Objects.hashCode(deserializationSchema, metadataConverters,
producedTypeInfo, ignoreErrors);
}
- interface MetadataConverter extends Serializable {
+ public interface MetadataConverter extends Serializable {
Object read(InLongMsgHead head);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgFormatFactory.java
similarity index 98%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgFormatFactory.java
index c7caa43290..7b68dbe7ec 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgFormatFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverter.java
similarity index 95%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverter.java
index b5819503e0..bbade1c3b1 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterBuilder.java
similarity index 87%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterBuilder.java
index 2e289d4faa..5719eea0c1 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
@@ -25,10 +25,10 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_I
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
/**
* The builder for {@link AbstractInLongMsgMixedFormatConverter}s.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterValidator.java
similarity index 91%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterValidator.java
index d793dff7fc..d06345e23a 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterValidator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.inlong.sort.formats.base.FormatDescriptorValidator;
@@ -23,7 +23,7 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
/**
* The validator for the properties of {@link
AbstractInLongMsgMixedFormatConverter}s.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatDeserializerValidator.java
similarity index 97%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatDeserializerValidator.java
index d04a7e32b5..5360813bff 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatDeserializerValidator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.inlong.sort.formats.base.FormatDescriptorValidator;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatFactory.java
similarity index 98%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatFactory.java
index 13bd4004ef..a181f8d813 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import java.util.Map;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgTextMixedFormatDeserializerBuilder.java
similarity index 98%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgTextMixedFormatDeserializerBuilder.java
index 10f74065e0..7d0773d9d4 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgTextMixedFormatDeserializerBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
similarity index 98%
rename from
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
rename to
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
index 1a85eddd1e..00391b2911 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
@@ -15,13 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatConstants;
import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.NoOpFailureHandler;
import org.apache.inlong.sort.formats.util.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 0aa99381d8..a0bdce2d50 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
+org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgFormatFactory
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
index 6232a13ebc..1f88226a98 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
@@ -17,7 +17,8 @@
package org.apache.inlong.sort.formats.inlongmsg;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgDeserializationSchema;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgDeserializationSchema.MetadataConverter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
index 64985985ab..0429e6f4aa 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgFormatFactory;
import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.util.ListCollector;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
index cea8c011d0..7d9beb1402 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
@@ -31,8 +31,8 @@ import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_METADATA_FIELD_NAME;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
index 57c795548a..b13cdd822c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
@@ -23,10 +23,10 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.DEFAULT_INCLUDE_UPDATE_BEFORE;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.DEFAULT_METADATA_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_INCLUDE_UPDATE_BEFORE;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 866a6be6ce..5128d51be5 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -18,11 +18,11 @@
package org.apache.inlong.sort.formats.inlongmsgbinlog;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
index 45b9992837..893a70f460 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
@@ -20,11 +20,11 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -37,8 +37,8 @@ import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_INCLUDE_UPDATE_BEFORE;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_METADATA_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.getDataRowFormatInfo;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
index 9bd4cf3c61..9f65d086d2 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsgbinlog;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
index 59d4959991..76ba972640 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
@@ -17,11 +17,11 @@
package org.apache.inlong.sort.formats.inlongmsgbinlog;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 40f398d6c5..59dbd84667 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -42,16 +42,16 @@ import java.util.Set;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
import static org.apache.inlong.sort.formats.base.TableFormatUtils.getType;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
public class InLongMsgBinlogUtils {
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
index d8957ee142..90c1424bd5 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
@@ -23,8 +23,8 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_INCLUDE_UPDATE_BEFORE;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_METADATA_FIELD_NAME;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
index 8fc0950709..6035073272 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.junit.Test;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.DEFAULT_METADATA_FIELD_NAME;
import static org.junit.Assert.assertEquals;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
index 96e807a327..61b368eec0 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
@@ -22,9 +22,9 @@ import
org.apache.inlong.sort.formats.base.TextFormatDescriptor;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index e43266e358..ab6a748056 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -40,12 +40,12 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_D
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
index 8e21aa1d51..f62a0e0bed 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
@@ -20,11 +20,11 @@ package org.apache.inlong.sort.formats.inlongmsgcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -42,10 +42,10 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LI
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsv.FORMAT_TYPE_VALUE;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index ea9f691cc6..6b4400f548 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -18,9 +18,9 @@
package org.apache.inlong.sort.formats.inlongmsgcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index 194ca2fa85..f78d094d77 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
package org.apache.inlong.sort.formats.inlongmsgcsv;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index caa82e8faf..c347f5861b 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -35,15 +35,15 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
index d478e4ac06..a08303f6c1 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
@@ -23,7 +23,7 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index a6aaa9c8d0..8508aaa90e 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -44,8 +44,8 @@ import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
import static org.junit.Assert.assertEquals;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
index e0d576d604..6dd23a904a 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
@@ -25,7 +25,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
index 1a1b93cc16..934ecc66a7 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
@@ -23,9 +23,9 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
/**
* Format descriptor for KVs.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index 24a8f09ec0..1fecb43d5f 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -42,12 +42,12 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_L
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
index 405069112f..be440b9522 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
@@ -21,11 +21,11 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -44,10 +44,10 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LI
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
/**
* Table format factory for providing configured instances of
InLongMsgKv-to-row
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index 6eef05212a..a70ff5a1a3 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -18,9 +18,9 @@
package org.apache.inlong.sort.formats.inlongmsgkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
index cceefd517e..ba85accfc8 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
package org.apache.inlong.sort.formats.inlongmsgkv;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 5d5d3b860a..faeb11fa18 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -32,15 +32,15 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
index e290784c09..a88dfe4fca 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
@@ -24,7 +24,7 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
/**
* The validator for {@link InLongMsgKv}.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
index 16a7adc8b0..9d8eb59735 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
@@ -35,8 +35,8 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
index 168ba7b5ec..ffd54bcdf3 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index e569d0f43f..eddbbccbc1 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -38,10 +38,10 @@ import java.util.Objects;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
/**
* The deserializer for the records in InLongMsgTlogCsv format.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
index 5f67880178..c7f8366a89 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
@@ -21,11 +21,11 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -42,9 +42,9 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IG
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
/**
* Table format factory for providing configured instances of
InLongMsgTlogCsv-to-row
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
index 27a1157dcd..4f587fb10b 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
@@ -18,9 +18,9 @@
package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
index 11f86d204e..1cfb39827e 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 9e0e35952b..d9a699b174 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -35,12 +35,12 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
index 1e9f297004..f3ed57267c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
@@ -22,7 +22,7 @@ import
org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
/**
* The validator for {@link InLongMsgTlogCsv}.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 19c18ec49c..7d66b9ba1f 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -41,8 +41,8 @@ import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static org.junit.Assert.assertEquals;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
index 06cde665a4..9cf4d5b88b 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
@@ -25,7 +25,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
index eb7ec66d35..8c324dc1d6 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index 9f3df07d5d..0e4f9f37e5 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -42,10 +42,10 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_K
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
index 611a3dc303..88fe090f28 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
@@ -21,11 +21,11 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -44,9 +44,9 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKv.FORMAT_TYPE_VALUE;
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
index 417343351e..7099fd995e 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
@@ -18,9 +18,9 @@
package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
index 84a6823e35..b756a02d99 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
package org.apache.inlong.sort.formats.inlongmsgtlogkv;
-import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 298cc650e0..b2cecd8248 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -31,12 +31,12 @@ import java.util.List;
import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
index 8b163a4bb2..43315097ef 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
@@ -24,7 +24,7 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
/**
* The validator for {@link InLongMsgTlogKv}.
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index d361313916..598d7bd5dd 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -41,8 +41,8 @@ import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
import static org.junit.Assert.assertEquals;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
index 5022c64b87..8922dc8a55 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
@@ -25,7 +25,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;