This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ae9990cdc2 [INLONG-8988][Manager] Supports multiple wrap types for
message body (#8989)
ae9990cdc2 is described below
commit ae9990cdc2fbad73662d0496e9c2c5cd928acc8e
Author: fuweng11 <[email protected]>
AuthorDate: Tue Sep 26 18:34:03 2023 +0800
[INLONG-8988][Manager] Supports multiple wrap types for message body (#8989)
* [INLONG-8988][Manager] Supports multiple wrap types for message body
---
inlong-manager/manager-dao/pom.xml | 8 +++++
.../manager/dao/entity/InlongStreamEntity.java | 1 +
.../resources/mappers/InlongStreamEntityMapper.xml | 40 ++++++++++++++--------
.../pojo/sort/node/base/ExtractNodeProvider.java | 5 +--
.../pojo/sort/node/provider/KafkaProvider.java | 2 +-
.../pojo/sort/node/provider/PulsarProvider.java | 2 +-
.../manager/pojo/source/kafka/KafkaSource.java | 5 +--
.../manager/pojo/source/pulsar/PulsarSource.java | 5 +--
.../manager/pojo/stream/InlongStreamBriefInfo.java | 4 +--
.../manager/pojo/stream/InlongStreamExtParam.java | 3 --
.../manager/pojo/stream/InlongStreamInfo.java | 4 +--
.../manager/pojo/stream/InlongStreamRequest.java | 5 +--
.../service/source/kafka/KafkaSourceOperator.java | 2 +-
.../source/pulsar/PulsarSourceOperator.java | 2 +-
.../main/resources/h2/apache_inlong_manager.sql | 1 +
.../manager-web/sql/apache_inlong_manager.sql | 1 +
inlong-manager/manager-web/sql/changes-1.10.0.sql | 31 +++++++++++++++++
17 files changed, 88 insertions(+), 33 deletions(-)
diff --git a/inlong-manager/manager-dao/pom.xml
b/inlong-manager/manager-dao/pom.xml
index 18e4dd06d4..7848b6dd26 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -65,6 +65,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun</groupId>
+ <artifactId>tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun</groupId>
+ <artifactId>jconsole</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
index 0cad8a5083..182eec90cd 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
@@ -37,6 +37,7 @@ public class InlongStreamEntity implements Serializable {
private String mqResource;
private String dataType;
+ private String wrapType;
private String dataEncoding;
private String dataSeparator;
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index 6f1f0aef14..9a693c1236 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -29,6 +29,7 @@
<result column="mq_resource" jdbcType="VARCHAR" property="mqResource"/>
<result column="data_type" jdbcType="VARCHAR" property="dataType"/>
+ <result column="wrap_type" jdbcType="VARCHAR" property="wrapType"/>
<result column="data_encoding" jdbcType="VARCHAR"
property="dataEncoding"/>
<result column="data_separator" jdbcType="VARCHAR"
property="dataSeparator"/>
<result column="data_escape_char" jdbcType="VARCHAR"
property="dataEscapeChar"/>
@@ -52,7 +53,7 @@
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, name, description, mq_resource,
- data_type, data_encoding, data_separator, data_escape_char, sync_send,
+ data_type, wrap_type, data_encoding, data_separator, data_escape_char,
sync_send,
daily_records, daily_storage, peak_records, max_length,
storage_period, ext_params,
status, previous_status, is_deleted, creator, modifier, create_time,
modify_time, version
</sql>
@@ -61,18 +62,20 @@
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamEntity">
insert into inlong_stream (id, inlong_group_id, inlong_stream_id,
name, description, mq_resource,
- data_type, data_encoding, data_separator,
- data_escape_char, sync_send, daily_records,
- daily_storage, peak_records, max_length,
- storage_period, ext_params, status,
- previous_status, creator, modifier)
+ data_type, wrap_type, data_encoding,
+ data_separator, data_escape_char, sync_send,
+ daily_records, daily_storage, peak_records,
+ max_length,storage_period, ext_params,
+ status, previous_status, creator,
+ modifier)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR},
#{mqResource,jdbcType=VARCHAR},
- #{dataType,jdbcType=VARCHAR},
#{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
- #{dataEscapeChar,jdbcType=VARCHAR},
#{syncSend,jdbcType=INTEGER}, #{dailyRecords,jdbcType=INTEGER},
- #{dailyStorage,jdbcType=INTEGER},
#{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
- #{storagePeriod,jdbcType=INTEGER},
#{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
- #{previousStatus,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
+ #{dataType,jdbcType=VARCHAR}, #{wrapType,jdbcType=VARCHAR},
#{dataEncoding,jdbcType=VARCHAR},
+ #{dataSeparator,jdbcType=VARCHAR},
#{dataEscapeChar,jdbcType=VARCHAR}, #{syncSend,jdbcType=INTEGER},
+ #{dailyRecords,jdbcType=INTEGER},
#{dailyStorage,jdbcType=INTEGER}, #{peakRecords,jdbcType=INTEGER},
+ #{maxLength,jdbcType=INTEGER},
#{storagePeriod,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
+ #{status,jdbcType=INTEGER},
#{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
+ #{modifier,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamEntity">
@@ -99,6 +102,9 @@
<if test="dataType != null">
data_type,
</if>
+ <if test="wrapType != null">
+ wrap_type,
+ </if>
<if test="dataEncoding != null">
data_encoding,
</if>
@@ -164,6 +170,9 @@
<if test="dataType != null">
#{dataType,jdbcType=VARCHAR},
</if>
+ <if test="wrapType != null">
+ #{wrapType,jdbcType=VARCHAR},
+ </if>
<if test="dataEncoding != null">
#{dataEncoding,jdbcType=VARCHAR},
</if>
@@ -234,7 +243,7 @@
parameterType="org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest">
select
distinct stream.id, stream.inlong_group_id, stream.inlong_stream_id,
stream.name,
- stream.description, stream.mq_resource, stream.data_type,
stream.data_encoding,
+ stream.description, stream.mq_resource, stream.data_type,
stream.wrap_type, stream.data_encoding,
stream.data_separator, stream.data_escape_char, stream.sync_send,
stream.daily_records,
stream.daily_storage, stream.peak_records, stream.max_length,
stream.storage_period,
stream.status, stream.creator, stream.modifier, stream.create_time,
stream.modify_time, stream.version
@@ -286,8 +295,7 @@
and is_deleted = 0
</select>
<select id="selectAllStreams"
resultType="org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo">
- select
- inlong_group_id,
+ select inlong_group_id,
inlong_stream_id,
mq_resource,
ext_params
@@ -311,6 +319,7 @@
description = #{description, jdbcType=VARCHAR},
mq_resource = #{mqResource, jdbcType=VARCHAR},
data_type = #{dataType, jdbcType=VARCHAR},
+ wrap_type = #{wrapType, jdbcType=VARCHAR},
data_encoding = #{dataEncoding, jdbcType=VARCHAR},
data_separator = #{dataSeparator, jdbcType=VARCHAR},
data_escape_char = #{dataEscapeChar, jdbcType=VARCHAR},
@@ -351,6 +360,9 @@
<if test="dataType != null">
data_type = #{dataType, jdbcType=VARCHAR},
</if>
+ <if test="wrapType != null">
+ wrap_type = #{wrapType, jdbcType=VARCHAR},
+ </if>
<if test="dataEncoding != null">
data_encoding = #{dataEncoding, jdbcType=VARCHAR},
</if>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index a8622a7be8..42b36b7591 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.pojo.sort.node.base;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.MessageWrapType;
import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -92,7 +93,7 @@ public interface ExtractNodeProvider extends NodeProvider {
*/
default Format parsingFormat(
String serializationType,
- boolean wrapWithInlongMsg,
+ String wrapType,
String separatorStr,
boolean ignoreParseErrors) {
Format format;
@@ -129,7 +130,7 @@ public interface ExtractNodeProvider extends NodeProvider {
default:
throw new IllegalArgumentException(String.format("Unsupported
dataType=%s", dataType));
}
- if (wrapWithInlongMsg) {
+ if (Objects.equals(wrapType, MessageWrapType.INLONG_MSG_V0.getName()))
{
Format innerFormat = format;
format = new InLongMsgFormat(innerFormat, false);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index e8315660b6..01649bea41 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -64,7 +64,7 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
Format format = parsingFormat(
kafkaSource.getSerializationType(),
- kafkaSource.isWrapWithInlongMsg(),
+ kafkaSource.getWrapType(),
kafkaSource.getDataSeparator(),
kafkaSource.isIgnoreParseErrors());
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index da20e8b203..b0bcd0c1c8 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -52,7 +52,7 @@ public class PulsarProvider implements ExtractNodeProvider {
pulsarSource.getPulsarTenant() + "/" +
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
Format format = parsingFormat(pulsarSource.getSerializationType(),
- pulsarSource.isWrapWithInlongMsg(),
+ pulsarSource.getWrapType(),
pulsarSource.getDataSeparator(),
pulsarSource.isIgnoreParseError());
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index c7ffdbdbfc..0ccb0b45aa 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.source.kafka;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -92,8 +93,8 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
- @ApiModelProperty("Whether wrap content with InlongMsg")
- private boolean wrapWithInlongMsg = true;
+ @ApiModelProperty(value = "The message body wrap wrap type, including:
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
+ private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();
public KafkaSource() {
this.setSourceType(SourceType.KAFKA);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 25d23f2e16..cdab4d59cf 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.source.pulsar;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -83,9 +84,9 @@ public class PulsarSource extends StreamSource {
@Builder.Default
private boolean isInlongComponent = false;
- @ApiModelProperty("Whether wrap content with InlongMsg")
+ @ApiModelProperty(value = "The message body wrap wrap type, including:
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
@Builder.Default
- private boolean wrapWithInlongMsg = true;
+ private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();
public PulsarSource() {
this.setSourceType(SourceType.PULSAR);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index 28dadc2b53..90197f9d32 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -82,8 +82,8 @@ public class InlongStreamBriefInfo {
@ApiModelProperty(value = "Data storage period, unit: day")
private Integer storagePeriod;
- @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
- private Boolean wrapWithInlongMsg;
+ @ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
+ private String wrapType;
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 0809b704cb..1e80e0bd7f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -45,9 +45,6 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private boolean ignoreParseError;
- @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
- private boolean wrapWithInlongMsg;
-
/**
* Pack extended attributes into ExtParams
*
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index c2b501b901..1a579d4960 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -130,8 +130,8 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "Version number")
private Integer version;
- @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
- private Boolean wrapWithInlongMsg = true;
+ @ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
+ private String wrapType;
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError = true;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index 65de69d1db..e2944329f4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,6 +121,7 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private boolean ignoreParseError = true;
- @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
- private boolean wrapWithInlongMsg = true;
+ @ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
+ private String wrapType;
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 0037170e79..6e1db90b37 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -152,7 +152,7 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
}
}
-
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
+ kafkaSource.setWrapType(streamInfo.getWrapType());
kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
kafkaSource.setFieldList(streamInfo.getFieldList());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 6aa998962a..989031184f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -142,7 +142,7 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
String serializationType =
DataTypeEnum.forType(streamInfo.getDataType()).getType();
pulsarSource.setSerializationType(serializationType);
}
-
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
+ pulsarSource.setWrapType(streamInfo.getWrapType());
pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
// set the token info
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 63ea3df4dd..2e02f2cc80 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -233,6 +233,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`description` varchar(256) DEFAULT '' COMMENT 'Description
of inlong stream',
`mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ
resource, in one stream, corresponding to the filter ID of TubeMQ,
corresponding to the topic of Pulsar',
`data_type` varchar(20) DEFAULT NULL COMMENT 'Data type,
including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+ `wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT
'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data
encoding format, including: UTF-8, GBK, etc.',
`data_separator` varchar(8) DEFAULT NULL COMMENT 'The source
data field separator',
`data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data
field escape character, the default is NULL (NULL), stored as 1 character',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index d96121ab14..1054722f2d 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -247,6 +247,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`description` varchar(256) DEFAULT '' COMMENT 'Description
of inlong stream',
`mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ
resource, in one stream, corresponding to the filter ID of TubeMQ,
corresponding to the topic of Pulsar',
`data_type` varchar(20) DEFAULT NULL COMMENT 'Data type,
including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+ `wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT
'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data
encoding format, including: UTF-8, GBK, etc.',
`data_separator` varchar(8) DEFAULT NULL COMMENT 'The source
data field separator',
`data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data
field escape character, the default is NULL (NULL), stored as 1 character',
diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql
b/inlong-manager/manager-web/sql/changes-1.10.0.sql
new file mode 100644
index 0000000000..1893852050
--- /dev/null
+++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is the SQL change file from version 1.9.0 to the current version
1.10.0.
+-- When upgrading to version 1.10.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+USE `apache_inlong_manager`;
+
+ALTER TABLE `inlong_stream`
+ ADD COLUMN `wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT 'The
message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc';
+
+
+
+