This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0ab0d5dc6 [INLONG-6998][Dashboard][Manager] Support ignore
deserialization error (#7001)
0ab0d5dc6 is described below
commit 0ab0d5dc6055ea7b4ee566420d861b927a9a6803
Author: featzhang <[email protected]>
AuthorDate: Fri Dec 23 18:58:00 2022 +0800
[INLONG-6998][Dashboard][Manager] Support ignore deserialization error
(#7001)
---
inlong-dashboard/src/locales/cn.json | 1 +
inlong-dashboard/src/locales/en.json | 1 +
.../src/metas/streams/common/StreamDefaultInfo.ts | 66 ++++++++++------
.../manager/common/consts/InlongConstants.java | 2 +
.../manager/pojo/sort/util/ExtractNodeUtils.java | 30 ++++++--
.../inlong/manager/pojo/source/StreamSource.java | 3 +
.../manager/pojo/stream/InlongStreamExtParam.java | 87 ++++++++++++++++++++++
.../manager/pojo/stream/InlongStreamInfo.java | 9 ++-
.../manager/pojo/stream/InlongStreamRequest.java | 5 ++
.../service/source/kafka/KafkaSourceOperator.java | 11 +--
.../source/pulsar/PulsarSourceOperator.java | 10 +--
.../source/tubemq/TubeMQSourceOperator.java | 5 ++
.../service/stream/InlongStreamServiceImpl.java | 45 +++++++++--
13 files changed, 224 insertions(+), 51 deletions(-)
diff --git a/inlong-dashboard/src/locales/cn.json
b/inlong-dashboard/src/locales/cn.json
index 54382ab28..213a25075 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -367,6 +367,7 @@
"meta.Stream.DataEncoding": "数据编码",
"meta.Stream.Description": "介绍",
"meta.Stream.SourceDataField": "源数据字段",
+ "meta.Stream.IgnoreParseError": "忽略数据解析错误",
"meta.Stream.Status.New": "新建",
"meta.Stream.Status.Pending": "配置中",
"meta.Stream.Status.Error": "配置失败",
diff --git a/inlong-dashboard/src/locales/en.json
b/inlong-dashboard/src/locales/en.json
index 64ae4a6f5..a98d14d3e 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -367,6 +367,7 @@
"meta.Stream.DataEncoding": "Data encoding",
"meta.Stream.Description": "Description",
"meta.Stream.SourceDataField": "Source fields",
+ "meta.Stream.IgnoreParseError": "Ignore parse error",
"meta.Stream.Status.New": "New",
"meta.Stream.Status.Pending": "Pending",
"meta.Stream.Status.Error": "Error",
diff --git a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
index 1ecd4a9cc..76d7b85d3 100644
--- a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
+++ b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
@@ -116,6 +116,27 @@ export class StreamDefaultInfo implements DataWithBackend,
RenderRow, RenderList
@I18n('meta.Stream.DataType')
dataType: string;
+ @FieldDecorator({
+ type: 'radio',
+ rules: [{ required: true }],
+ initialValue: true,
+ props: values => ({
+ disabled: [110, 130].includes(values?.status),
+ options: [
+ {
+ label: i18n.t('basic.Yes'),
+ value: true,
+ },
+ {
+ label: i18n.t('basic.No'),
+ value: false,
+ },
+ ],
+ }),
+ })
+ @I18n('meta.Stream.IgnoreParseError')
+ ignoreParseError: boolean;
+
@FieldDecorator({
type: 'radio',
initialValue: 'UTF-8',
@@ -188,28 +209,6 @@ export class StreamDefaultInfo implements DataWithBackend,
RenderRow, RenderList
@I18n('meta.Stream.DataSeparator')
dataSeparator: string;
- @FieldDecorator({
- type: 'radio',
- isPro: true,
- rules: [{ required: true }],
- initialValue: 1,
- tooltip: i18n.t('meta.Stream.WrapWithInlongMsgHelp'),
- props: values => ({
- options: [
- {
- label: i18n.t('basic.Yes'),
- value: 1,
- },
- {
- label: i18n.t('basic.No'),
- value: 0,
- },
- ],
- }),
- })
- @I18n('meta.Stream.WrapWithInlongMsg')
- wrapWithInlongMsg: number;
-
@FieldDecorator({
type: EditableTable,
props: values => ({
@@ -251,6 +250,29 @@ export class StreamDefaultInfo implements DataWithBackend,
RenderRow, RenderList
@I18n('meta.Stream.SourceDataField')
rowTypeFields: Record<string, string>[];
+ @FieldDecorator({
+ type: 'radio',
+ isPro: true,
+ rules: [{ required: true }],
+ initialValue: true,
+ tooltip: i18n.t('meta.Stream.WrapWithInlongMsgHelp'),
+ props: values => ({
+ disabled: [110, 130].includes(values?.status),
+ options: [
+ {
+ label: i18n.t('basic.Yes'),
+ value: true,
+ },
+ {
+ label: i18n.t('basic.No'),
+ value: false,
+ },
+ ],
+ }),
+ })
+ @I18n('meta.Stream.WrapWithInlongMsg')
+ wrapWithInlongMsg: boolean;
+
parse(data) {
return data;
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 922fec58a..c41299faa 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -138,4 +138,6 @@ public class InlongConstants {
public static final String DATA_TYPE_RAW_PREFIX = "RAW_";
+ public static final int DEFAULT_ENABLE_VALUE = 1;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 05cb5760d..72dda505c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -172,8 +172,11 @@ public class ExtractNodeUtils {
String topic = kafkaSource.getTopic();
String bootstrapServers = kafkaSource.getBootstrapServers();
- Format format = parsingFormat(kafkaSource.getSerializationType(),
- kafkaSource.isWrapWithInlongMsg(),
kafkaSource.getDataSeparator());
+ Format format = parsingFormat(
+ kafkaSource.getSerializationType(),
+ kafkaSource.isWrapWithInlongMsg(),
+ kafkaSource.getDataSeparator(),
+ kafkaSource.isIgnoreParseErrors());
KafkaOffset kafkaOffset =
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
KafkaScanStartupMode startupMode;
@@ -223,7 +226,9 @@ public class ExtractNodeUtils {
pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() +
"/" + pulsarSource.getTopic();
Format format = parsingFormat(pulsarSource.getSerializationType(),
- pulsarSource.isWrapWithInlongMsg(),
pulsarSource.getDataSeparator());
+ pulsarSource.isWrapWithInlongMsg(),
+ pulsarSource.getDataSeparator(),
+ pulsarSource.isIgnoreParseError());
PulsarScanStartupMode startupMode =
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
final String primaryKey = pulsarSource.getPrimaryKey();
@@ -441,9 +446,14 @@ public class ExtractNodeUtils {
* @param serializationType data serialization, support: csv, json, canal,
avro, etc
* @param wrapWithInlongMsg whether wrap content with {@link
InLongMsgFormat}
* @param separatorStr the separator of data content
+ * @param ignoreParseErrors whether ignore deserialization error data
* @return the format for serialized content
*/
- private static Format parsingFormat(String serializationType, boolean
wrapWithInlongMsg, String separatorStr) {
+ private static Format parsingFormat(
+ String serializationType,
+ boolean wrapWithInlongMsg,
+ String separatorStr,
+ boolean ignoreParseErrors) {
Format format;
DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
switch (dataType) {
@@ -452,19 +462,25 @@ public class ExtractNodeUtils {
char dataSeparator = (char) Integer.parseInt(separatorStr);
separatorStr = Character.toString(dataSeparator);
}
- format = new CsvFormat(separatorStr);
+ CsvFormat csvFormat = new CsvFormat(separatorStr);
+ csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = csvFormat;
break;
case AVRO:
format = new AvroFormat();
break;
case JSON:
- format = new JsonFormat();
+ JsonFormat jsonFormat = new JsonFormat();
+ jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = jsonFormat;
break;
case CANAL:
format = new CanalJsonFormat();
break;
case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
+ DebeziumJsonFormat debeziumJsonFormat = new
DebeziumJsonFormat();
+ debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = debeziumJsonFormat;
break;
case RAW:
format = new RawFormat();
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 69719a9c4..93b7a0938 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -110,6 +110,9 @@ public abstract class StreamSource extends StreamNode {
@ApiModelProperty("Sub source information of existing agents")
private List<SubSourceDTO> subSourceList;
+ @ApiModelProperty(value = "Whether to ignore the parse errors of field
value, true as default")
+ private boolean ignoreParseError;
+
public SourceRequest genSourceRequest() {
return null;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
new file mode 100644
index 000000000..6186b7dc4
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import java.io.Serializable;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+/**
+ * Extended params, will be saved as JSON string
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@ApiModel("Inlong stream ext param info")
+public class InlongStreamExtParam implements Serializable {
+
+ @ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
+ private boolean ignoreParseError;
+
+ @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
+ private boolean wrapWithInlongMsg;
+
+ /**
+ * Pack extended attributes into ExtParams
+ *
+ * @param request the request
+ * @return the packed extParams
+ */
+ public static String packExtParams(InlongStreamRequest request) {
+ InlongStreamExtParam extParam =
CommonBeanUtils.copyProperties(request, InlongStreamExtParam::new,
+ true);
+ return JsonUtils.toJsonString(extParam);
+ }
+
+ /**
+ * Unpack extended attributes from {@link InlongStreamExtInfo}, will
remove target attributes from it.
+ *
+ * @param extParams the extParams value load from db
+ * @param targetObject the targetObject with to fill up
+ */
+ public static void unpackExtParams(
+ String extParams,
+ Object targetObject) {
+ if (StringUtils.isNoneBlank(extParams)) {
+ InlongStreamExtParam inlongStreamExtParam =
JsonUtils.parseObject(extParams, InlongStreamExtParam.class);
+ if (inlongStreamExtParam != null) {
+ CommonBeanUtils.copyProperties(inlongStreamExtParam,
targetObject, true);
+ }
+ }
+ }
+
+ /**
+ * Expand extParam filed, and fill in {@link InlongStreamInfo}
+ *
+ * @param streamInfo the InlongStreamInfo need to filled
+ */
+ public static void unpackExtParams(InlongStreamInfo streamInfo) {
+ unpackExtParams(streamInfo.getExtParams(), streamInfo);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index c17d59bfa..55fd159c8 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -65,9 +65,6 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
private String dataType;
- @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg, 0: no, 1: yes (as default)")
- private Integer wrapWithInlongMsg;
-
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;
@@ -132,6 +129,12 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "Version number")
private Integer version;
+ @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
+ private Boolean wrapWithInlongMsg = true;
+
+ @ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
+ private Boolean ignoreParseError = true;
+
public InlongStreamRequest genRequest() {
return CommonBeanUtils.copyProperties(this, InlongStreamRequest::new);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index ecabc8232..b8061daff 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -101,4 +101,9 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Version number")
private Integer version;
+ @ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
+ private boolean ignoreParseError = true;
+
+ @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg")
+ private boolean wrapWithInlongMsg = true;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 62a858da7..6b6dff0cb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -17,11 +17,10 @@
package org.apache.inlong.manager.service.source.kafka;
-import static
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -111,6 +110,10 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
kafkaSource.setSourceName(streamId);
kafkaSource.setBootstrapServers(bootstrapServers);
kafkaSource.setTopic(streamInfo.getMqResource());
+ String serializationType =
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+ kafkaSource.setSerializationType(serializationType);
+ kafkaSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
+
for (StreamSource sourceInfo : streamSources) {
if (!Objects.equals(streamId, sourceInfo.getInlongStreamId()))
{
continue;
@@ -118,9 +121,7 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
}
- Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
- kafkaSource.setWrapWithInlongMsg(
- null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG
== wrapWithInlongMsg);
+
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
kafkaSource.setFieldList(streamInfo.getFieldList());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 57600f0a1..a29020a0e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.service.source.pulsar;
-import static
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -129,10 +127,10 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
pulsarSource.setAdminUrl(adminUrl);
pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
-
- Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
- pulsarSource.setWrapWithInlongMsg(
- null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG
== wrapWithInlongMsg);
+ String serializationType =
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+ pulsarSource.setSerializationType(serializationType);
+
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
+ pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
// set the token info
if (StringUtils.isNotBlank(pulsarCluster.getToken())) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 09c71a6df..488c866d4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.tubemq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -107,6 +108,10 @@ public class TubeMQSourceOperator extends
AbstractSourceOperator {
tubeMQSource.setTopic(streamInfo.getMqResource());
tubeMQSource.setGroupId(streamId);
tubeMQSource.setMasterRpc(masterRpc);
+ String serializationType =
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+ tubeMQSource.setSerializationType(serializationType);
+ tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
+
for (StreamSource sourceInfo : streamSources) {
if (!Objects.equals(streamId, sourceInfo.getInlongStreamId()))
{
continue;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 1c94b27e5..6a719e8c5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -72,6 +72,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
+
/**
* Inlong stream service layer implementation
*/
@@ -121,11 +124,15 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
streamEntity.setStatus(StreamStatus.NEW.getCode());
streamEntity.setCreator(operator);
streamEntity.setModifier(operator);
+ // Processing extended attributes
+ String extParam = packExtParams(request);
+ streamEntity.setExtParams(extParam);
streamMapper.insertSelective(streamEntity);
saveField(groupId, streamId, request.getFieldList());
- if (CollectionUtils.isNotEmpty(request.getExtList())) {
- saveOrUpdateExt(groupId, streamId, request.getExtList());
+ List<InlongStreamExtInfo> extList = request.getExtList();
+ if (CollectionUtils.isNotEmpty(extList)) {
+ saveOrUpdateExt(groupId, streamId, extList);
}
LOGGER.info("success to save inlong stream info for groupId={}",
groupId);
@@ -178,6 +185,9 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
if (StringUtils.isEmpty(request.getMqResource())) {
request.setMqResource(streamId);
}
+ // Processing extended attributes
+ String extParams = packExtParams(request);
+ request.setExtParams(extParams);
// Processing inlong stream
InlongStreamEntity streamEntity =
CommonBeanUtils.copyProperties(request, InlongStreamEntity::new);
streamEntity.setStatus(StreamStatus.NEW.getCode());
@@ -186,8 +196,9 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
// add record
streamMapper.insertSelective(streamEntity);
saveField(groupId, streamId, request.getFieldList());
- if (CollectionUtils.isNotEmpty(request.getExtList())) {
- saveOrUpdateExt(groupId, streamId, request.getExtList());
+ List<InlongStreamExtInfo> extList = request.getExtList();
+ if (CollectionUtils.isNotEmpty(extList)) {
+ saveOrUpdateExt(groupId, streamId, extList);
}
return streamEntity.getId();
}
@@ -215,9 +226,13 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
List<StreamField> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
+ // load ext infos
List<InlongStreamExtEntity> extEntities =
streamExtMapper.selectByRelatedId(groupId, streamId);
List<InlongStreamExtInfo> exts =
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
streamInfo.setExtList(exts);
+ // load extParams
+ unpackExtParams(streamEntity.getExtParams(), streamInfo);
+
List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
streamInfo.setSinkList(sinkList);
List<StreamSource> sourceList = sourceService.listSource(groupId,
streamId);
@@ -257,6 +272,9 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+ // Processing extParams
+ unpackExtParams(streamEntity.getExtParams(), streamInfo);
+ // Load fields
List<StreamField> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
List<InlongStreamExtEntity> extEntities =
streamExtMapper.selectByRelatedId(groupId, streamId);
@@ -288,6 +306,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
Collectors.toCollection(ArrayList::new)));
streamList.forEach(streamInfo -> {
String streamId = streamInfo.getInlongStreamId();
+ // Processing extParams
+ unpackExtParams(streamInfo.getExtParams(), streamInfo);
List<StreamField> fieldInfos = streamFieldMap.get(streamId);
streamInfo.setFieldList(fieldInfos);
List<InlongStreamExtInfo> extInfos = extInfoMap.get(streamId);
@@ -363,10 +383,12 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
for (InlongStreamInfo streamInfo : streamInfoList) {
// Set the field information of the inlong stream
String streamId = streamInfo.getInlongStreamId();
+ unpackExtParams(streamInfo);
List<StreamField> streamFields = getStreamFields(groupId,
streamId);
streamInfo.setFieldList(streamFields);
+ List<InlongStreamExtEntity> extEntities =
streamExtMapper.selectByRelatedId(groupId, streamId);
List<InlongStreamExtInfo> streamExtInfos =
CommonBeanUtils.copyListProperties(
- streamExtMapper.selectByRelatedId(groupId, streamId),
InlongStreamExtInfo::new);
+ extEntities, InlongStreamExtInfo::new);
streamInfo.setExtList(streamExtInfos);
// query all valid stream sources
@@ -471,6 +493,9 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
String.format("stream has already updated with groupId=%s,
streamId=%s, curVersion=%s",
streamEntity.getInlongGroupId(),
streamEntity.getInlongStreamId(), request.getVersion()));
}
+ // Processing extended attributes
+ String extParams = packExtParams(request);
+ request.setExtParams(extParams);
// update record
CommonBeanUtils.copyProperties(request, streamEntity, true);
streamEntity.setModifier(opInfo.getName());
@@ -480,7 +505,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
// update stream fields
updateField(groupId, streamId, request.getFieldList());
// update stream extension infos
- saveOrUpdateExt(groupId, streamId, request.getExtList());
+ List<InlongStreamExtInfo> extList = request.getExtList();
+ saveOrUpdateExt(groupId, streamId, extList);
return true;
}
@@ -503,7 +529,9 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
-
+ // Processing extended attributes
+ String extParams = packExtParams(request);
+ request.setExtParams(extParams);
CommonBeanUtils.copyProperties(request, streamEntity, true);
streamEntity.setModifier(operator);
int rowCount = streamMapper.updateByIdentifierSelective(streamEntity);
@@ -514,7 +542,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
// update stream fields
updateField(groupId, streamId, request.getFieldList());
// update stream extension infos
- saveOrUpdateExt(groupId, streamId, request.getExtList());
+ List<InlongStreamExtInfo> extList = request.getExtList();
+ saveOrUpdateExt(groupId, streamId, extList);
LOGGER.info("success to update inlong stream without check for
groupId={} streamId={}", groupId, streamId);
return true;