This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ab0d5dc6 [INLONG-6998][Dashboard][Manager] Support ignore 
deserialization error (#7001)
0ab0d5dc6 is described below

commit 0ab0d5dc6055ea7b4ee566420d861b927a9a6803
Author: featzhang <[email protected]>
AuthorDate: Fri Dec 23 18:58:00 2022 +0800

    [INLONG-6998][Dashboard][Manager] Support ignore deserialization error 
(#7001)
---
 inlong-dashboard/src/locales/cn.json               |  1 +
 inlong-dashboard/src/locales/en.json               |  1 +
 .../src/metas/streams/common/StreamDefaultInfo.ts  | 66 ++++++++++------
 .../manager/common/consts/InlongConstants.java     |  2 +
 .../manager/pojo/sort/util/ExtractNodeUtils.java   | 30 ++++++--
 .../inlong/manager/pojo/source/StreamSource.java   |  3 +
 .../manager/pojo/stream/InlongStreamExtParam.java  | 87 ++++++++++++++++++++++
 .../manager/pojo/stream/InlongStreamInfo.java      |  9 ++-
 .../manager/pojo/stream/InlongStreamRequest.java   |  5 ++
 .../service/source/kafka/KafkaSourceOperator.java  | 11 +--
 .../source/pulsar/PulsarSourceOperator.java        | 10 +--
 .../source/tubemq/TubeMQSourceOperator.java        |  5 ++
 .../service/stream/InlongStreamServiceImpl.java    | 45 +++++++++--
 13 files changed, 224 insertions(+), 51 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json 
b/inlong-dashboard/src/locales/cn.json
index 54382ab28..213a25075 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -367,6 +367,7 @@
   "meta.Stream.DataEncoding": "数据编码",
   "meta.Stream.Description": "介绍",
   "meta.Stream.SourceDataField": "源数据字段",
+  "meta.Stream.IgnoreParseError": "忽略数据解析错误",
   "meta.Stream.Status.New": "新建",
   "meta.Stream.Status.Pending": "配置中",
   "meta.Stream.Status.Error": "配置失败",
diff --git a/inlong-dashboard/src/locales/en.json 
b/inlong-dashboard/src/locales/en.json
index 64ae4a6f5..a98d14d3e 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -367,6 +367,7 @@
   "meta.Stream.DataEncoding": "Data encoding",
   "meta.Stream.Description": "Description",
   "meta.Stream.SourceDataField": "Source fields",
+  "meta.Stream.IgnoreParseError": "Ignore parse error",
   "meta.Stream.Status.New": "New",
   "meta.Stream.Status.Pending": "Pending",
   "meta.Stream.Status.Error": "Error",
diff --git a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts 
b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
index 1ecd4a9cc..76d7b85d3 100644
--- a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
+++ b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
@@ -116,6 +116,27 @@ export class StreamDefaultInfo implements DataWithBackend, 
RenderRow, RenderList
   @I18n('meta.Stream.DataType')
   dataType: string;
 
+  @FieldDecorator({
+    type: 'radio',
+    rules: [{ required: true }],
+    initialValue: true,
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
+      options: [
+        {
+          label: i18n.t('basic.Yes'),
+          value: true,
+        },
+        {
+          label: i18n.t('basic.No'),
+          value: false,
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Stream.IgnoreParseError')
+  ignoreParseError: boolean;
+
   @FieldDecorator({
     type: 'radio',
     initialValue: 'UTF-8',
@@ -188,28 +209,6 @@ export class StreamDefaultInfo implements DataWithBackend, 
RenderRow, RenderList
   @I18n('meta.Stream.DataSeparator')
   dataSeparator: string;
 
-  @FieldDecorator({
-    type: 'radio',
-    isPro: true,
-    rules: [{ required: true }],
-    initialValue: 1,
-    tooltip: i18n.t('meta.Stream.WrapWithInlongMsgHelp'),
-    props: values => ({
-      options: [
-        {
-          label: i18n.t('basic.Yes'),
-          value: 1,
-        },
-        {
-          label: i18n.t('basic.No'),
-          value: 0,
-        },
-      ],
-    }),
-  })
-  @I18n('meta.Stream.WrapWithInlongMsg')
-  wrapWithInlongMsg: number;
-
   @FieldDecorator({
     type: EditableTable,
     props: values => ({
@@ -251,6 +250,29 @@ export class StreamDefaultInfo implements DataWithBackend, 
RenderRow, RenderList
   @I18n('meta.Stream.SourceDataField')
   rowTypeFields: Record<string, string>[];
 
+  @FieldDecorator({
+    type: 'radio',
+    isPro: true,
+    rules: [{ required: true }],
+    initialValue: true,
+    tooltip: i18n.t('meta.Stream.WrapWithInlongMsgHelp'),
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
+      options: [
+        {
+          label: i18n.t('basic.Yes'),
+          value: true,
+        },
+        {
+          label: i18n.t('basic.No'),
+          value: false,
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Stream.WrapWithInlongMsg')
+  wrapWithInlongMsg: boolean;
+
   parse(data) {
     return data;
   }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 922fec58a..c41299faa 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -138,4 +138,6 @@ public class InlongConstants {
 
     public static final String DATA_TYPE_RAW_PREFIX = "RAW_";
 
+    public static final int DEFAULT_ENABLE_VALUE = 1;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 05cb5760d..72dda505c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -172,8 +172,11 @@ public class ExtractNodeUtils {
         String topic = kafkaSource.getTopic();
         String bootstrapServers = kafkaSource.getBootstrapServers();
 
-        Format format = parsingFormat(kafkaSource.getSerializationType(),
-                kafkaSource.isWrapWithInlongMsg(), 
kafkaSource.getDataSeparator());
+        Format format = parsingFormat(
+                kafkaSource.getSerializationType(),
+                kafkaSource.isWrapWithInlongMsg(),
+                kafkaSource.getDataSeparator(),
+                kafkaSource.isIgnoreParseErrors());
 
         KafkaOffset kafkaOffset = 
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
         KafkaScanStartupMode startupMode;
@@ -223,7 +226,9 @@ public class ExtractNodeUtils {
                 pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + 
"/" + pulsarSource.getTopic();
 
         Format format = parsingFormat(pulsarSource.getSerializationType(),
-                pulsarSource.isWrapWithInlongMsg(), 
pulsarSource.getDataSeparator());
+                pulsarSource.isWrapWithInlongMsg(),
+                pulsarSource.getDataSeparator(),
+                pulsarSource.isIgnoreParseError());
 
         PulsarScanStartupMode startupMode = 
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
         final String primaryKey = pulsarSource.getPrimaryKey();
@@ -441,9 +446,14 @@ public class ExtractNodeUtils {
      * @param serializationType data serialization, support: csv, json, canal, 
avro, etc
      * @param wrapWithInlongMsg whether wrap content with {@link 
InLongMsgFormat}
      * @param separatorStr the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
      * @return the format for serialized content
      */
-    private static Format parsingFormat(String serializationType, boolean 
wrapWithInlongMsg, String separatorStr) {
+    private static Format parsingFormat(
+            String serializationType,
+            boolean wrapWithInlongMsg,
+            String separatorStr,
+            boolean ignoreParseErrors) {
         Format format;
         DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
         switch (dataType) {
@@ -452,19 +462,25 @@ public class ExtractNodeUtils {
                     char dataSeparator = (char) Integer.parseInt(separatorStr);
                     separatorStr = Character.toString(dataSeparator);
                 }
-                format = new CsvFormat(separatorStr);
+                CsvFormat csvFormat = new CsvFormat(separatorStr);
+                csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = csvFormat;
                 break;
             case AVRO:
                 format = new AvroFormat();
                 break;
             case JSON:
-                format = new JsonFormat();
+                JsonFormat jsonFormat = new JsonFormat();
+                jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = jsonFormat;
                 break;
             case CANAL:
                 format = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                DebeziumJsonFormat debeziumJsonFormat = new 
DebeziumJsonFormat();
+                debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = debeziumJsonFormat;
                 break;
             case RAW:
                 format = new RawFormat();
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 69719a9c4..93b7a0938 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -110,6 +110,9 @@ public abstract class StreamSource extends StreamNode {
     @ApiModelProperty("Sub source information of existing agents")
     private List<SubSourceDTO> subSourceList;
 
+    @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value, true as default")
+    private boolean ignoreParseError;
+
     public SourceRequest genSourceRequest() {
         return null;
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
new file mode 100644
index 000000000..6186b7dc4
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import java.io.Serializable;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+/**
+ * Extended params, will be saved as JSON string
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@ApiModel("Inlong stream ext param info")
+public class InlongStreamExtParam implements Serializable {
+
+    @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
+    private boolean ignoreParseError;
+
+    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
+    private boolean wrapWithInlongMsg;
+
+    /**
+     * Pack extended attributes into ExtParams
+     *
+     * @param request the request
+     * @return the packed extParams
+     */
+    public static String packExtParams(InlongStreamRequest request) {
+        InlongStreamExtParam extParam = 
CommonBeanUtils.copyProperties(request, InlongStreamExtParam::new,
+                true);
+        return JsonUtils.toJsonString(extParam);
+    }
+
+    /**
+     * Unpack extended attributes from {@link InlongStreamExtInfo}, will 
remove target attributes from it.
+     *
+     * @param extParams the extParams value load from db
+     * @param targetObject the targetObject with to fill up
+     */
+    public static void unpackExtParams(
+            String extParams,
+            Object targetObject) {
+        if (StringUtils.isNoneBlank(extParams)) {
+            InlongStreamExtParam inlongStreamExtParam = 
JsonUtils.parseObject(extParams, InlongStreamExtParam.class);
+            if (inlongStreamExtParam != null) {
+                CommonBeanUtils.copyProperties(inlongStreamExtParam, 
targetObject, true);
+            }
+        }
+    }
+
+    /**
+     * Expand extParam filed, and fill in {@link InlongStreamInfo}
+     *
+     * @param streamInfo the InlongStreamInfo need to filled
+     */
+    public static void unpackExtParams(InlongStreamInfo streamInfo) {
+        unpackExtParams(streamInfo.getExtParams(), streamInfo);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index c17d59bfa..55fd159c8 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -65,9 +65,6 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
     private String dataType;
 
-    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg, 0: no, 1: yes (as default)")
-    private Integer wrapWithInlongMsg;
-
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
     private String dataEncoding;
 
@@ -132,6 +129,12 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "Version number")
     private Integer version;
 
+    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
+    private Boolean wrapWithInlongMsg = true;
+
+    @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
+    private Boolean ignoreParseError = true;
+
     public InlongStreamRequest genRequest() {
         return CommonBeanUtils.copyProperties(this, InlongStreamRequest::new);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index ecabc8232..b8061daff 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -101,4 +101,9 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "Version number")
     private Integer version;
 
+    @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
+    private boolean ignoreParseError = true;
+
+    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
+    private boolean wrapWithInlongMsg = true;
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 62a858da7..6b6dff0cb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -17,11 +17,10 @@
 
 package org.apache.inlong.manager.service.source.kafka;
 
-import static 
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -111,6 +110,10 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
             kafkaSource.setSourceName(streamId);
             kafkaSource.setBootstrapServers(bootstrapServers);
             kafkaSource.setTopic(streamInfo.getMqResource());
+            String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+            kafkaSource.setSerializationType(serializationType);
+            kafkaSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
+
             for (StreamSource sourceInfo : streamSources) {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
                     continue;
@@ -118,9 +121,7 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
                 
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
             }
 
-            Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
-            kafkaSource.setWrapWithInlongMsg(
-                    null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG 
== wrapWithInlongMsg);
+            
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
 
             kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
             kafkaSource.setFieldList(streamInfo.getFieldList());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 57600f0a1..a29020a0e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.service.source.pulsar;
 
-import static 
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
@@ -129,10 +127,10 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
             pulsarSource.setAdminUrl(adminUrl);
             pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
-
-            Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
-            pulsarSource.setWrapWithInlongMsg(
-                    null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG 
== wrapWithInlongMsg);
+            String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+            pulsarSource.setSerializationType(serializationType);
+            
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
+            pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
             // set the token info
             if (StringUtils.isNotBlank(pulsarCluster.getToken())) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 09c71a6df..488c866d4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.tubemq;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -107,6 +108,10 @@ public class TubeMQSourceOperator extends 
AbstractSourceOperator {
             tubeMQSource.setTopic(streamInfo.getMqResource());
             tubeMQSource.setGroupId(streamId);
             tubeMQSource.setMasterRpc(masterRpc);
+            String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+            tubeMQSource.setSerializationType(serializationType);
+            tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
+
             for (StreamSource sourceInfo : streamSources) {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
                     continue;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 1c94b27e5..6a719e8c5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -72,6 +72,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
+
 /**
  * Inlong stream service layer implementation
  */
@@ -121,11 +124,15 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         streamEntity.setStatus(StreamStatus.NEW.getCode());
         streamEntity.setCreator(operator);
         streamEntity.setModifier(operator);
+        // Processing extended attributes
+        String extParam = packExtParams(request);
+        streamEntity.setExtParams(extParam);
 
         streamMapper.insertSelective(streamEntity);
         saveField(groupId, streamId, request.getFieldList());
-        if (CollectionUtils.isNotEmpty(request.getExtList())) {
-            saveOrUpdateExt(groupId, streamId, request.getExtList());
+        List<InlongStreamExtInfo> extList = request.getExtList();
+        if (CollectionUtils.isNotEmpty(extList)) {
+            saveOrUpdateExt(groupId, streamId, extList);
         }
 
         LOGGER.info("success to save inlong stream info for groupId={}", 
groupId);
@@ -178,6 +185,9 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         if (StringUtils.isEmpty(request.getMqResource())) {
             request.setMqResource(streamId);
         }
+        // Processing extended attributes
+        String extParams = packExtParams(request);
+        request.setExtParams(extParams);
         // Processing inlong stream
         InlongStreamEntity streamEntity = 
CommonBeanUtils.copyProperties(request, InlongStreamEntity::new);
         streamEntity.setStatus(StreamStatus.NEW.getCode());
@@ -186,8 +196,9 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         // add record
         streamMapper.insertSelective(streamEntity);
         saveField(groupId, streamId, request.getFieldList());
-        if (CollectionUtils.isNotEmpty(request.getExtList())) {
-            saveOrUpdateExt(groupId, streamId, request.getExtList());
+        List<InlongStreamExtInfo> extList = request.getExtList();
+        if (CollectionUtils.isNotEmpty(extList)) {
+            saveOrUpdateExt(groupId, streamId, extList);
         }
         return streamEntity.getId();
     }
@@ -215,9 +226,13 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
         List<StreamField> streamFields = getStreamFields(groupId, streamId);
         streamInfo.setFieldList(streamFields);
+        // load ext infos
         List<InlongStreamExtEntity> extEntities = 
streamExtMapper.selectByRelatedId(groupId, streamId);
         List<InlongStreamExtInfo> exts = 
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
         streamInfo.setExtList(exts);
+        // load extParams
+        unpackExtParams(streamEntity.getExtParams(), streamInfo);
+
         List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
         streamInfo.setSinkList(sinkList);
         List<StreamSource> sourceList = sourceService.listSource(groupId, 
streamId);
@@ -257,6 +272,9 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
             throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
         }
         InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+        // Processing extParams
+        unpackExtParams(streamEntity.getExtParams(), streamInfo);
+        // Load fields
         List<StreamField> streamFields = getStreamFields(groupId, streamId);
         streamInfo.setFieldList(streamFields);
         List<InlongStreamExtEntity> extEntities = 
streamExtMapper.selectByRelatedId(groupId, streamId);
@@ -288,6 +306,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
                         Collectors.toCollection(ArrayList::new)));
         streamList.forEach(streamInfo -> {
             String streamId = streamInfo.getInlongStreamId();
+            // Processing extParams
+            unpackExtParams(streamInfo.getExtParams(), streamInfo);
             List<StreamField> fieldInfos = streamFieldMap.get(streamId);
             streamInfo.setFieldList(fieldInfos);
             List<InlongStreamExtInfo> extInfos = extInfoMap.get(streamId);
@@ -363,10 +383,12 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         for (InlongStreamInfo streamInfo : streamInfoList) {
             // Set the field information of the inlong stream
             String streamId = streamInfo.getInlongStreamId();
+            unpackExtParams(streamInfo);
             List<StreamField> streamFields = getStreamFields(groupId, 
streamId);
             streamInfo.setFieldList(streamFields);
+            List<InlongStreamExtEntity> extEntities = 
streamExtMapper.selectByRelatedId(groupId, streamId);
             List<InlongStreamExtInfo> streamExtInfos = 
CommonBeanUtils.copyListProperties(
-                    streamExtMapper.selectByRelatedId(groupId, streamId), 
InlongStreamExtInfo::new);
+                    extEntities, InlongStreamExtInfo::new);
             streamInfo.setExtList(streamExtInfos);
 
             // query all valid stream sources
@@ -471,6 +493,9 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
                     String.format("stream has already updated with groupId=%s, 
streamId=%s, curVersion=%s",
                             streamEntity.getInlongGroupId(), 
streamEntity.getInlongStreamId(), request.getVersion()));
         }
+        // Processing extended attributes
+        String extParams = packExtParams(request);
+        request.setExtParams(extParams);
         // update record
         CommonBeanUtils.copyProperties(request, streamEntity, true);
         streamEntity.setModifier(opInfo.getName());
@@ -480,7 +505,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         // update stream fields
         updateField(groupId, streamId, request.getFieldList());
         // update stream extension infos
-        saveOrUpdateExt(groupId, streamId, request.getExtList());
+        List<InlongStreamExtInfo> extList = request.getExtList();
+        saveOrUpdateExt(groupId, streamId, extList);
         return true;
     }
 
@@ -503,7 +529,9 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
-
+        // Processing extended attributes
+        String extParams = packExtParams(request);
+        request.setExtParams(extParams);
         CommonBeanUtils.copyProperties(request, streamEntity, true);
         streamEntity.setModifier(operator);
         int rowCount = streamMapper.updateByIdentifierSelective(streamEntity);
@@ -514,7 +542,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         // update stream fields
         updateField(groupId, streamId, request.getFieldList());
         // update stream extension infos
-        saveOrUpdateExt(groupId, streamId, request.getExtList());
+        List<InlongStreamExtInfo> extList = request.getExtList();
+        saveOrUpdateExt(groupId, streamId, extList);
 
         LOGGER.info("success to update inlong stream without check for 
groupId={} streamId={}", groupId, streamId);
         return true;

Reply via email to