This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a9d7deb51 [INLONG-6961][Dashboard][Manager] Support for configing
non-InlongMsg formats (#6969)
a9d7deb51 is described below
commit a9d7deb513755eaa38cb1b21cdc7d54ff2534c10
Author: featzhang <[email protected]>
AuthorDate: Thu Dec 22 11:22:01 2022 +0800
[INLONG-6961][Dashboard][Manager] Support for configing non-InlongMsg
formats (#6969)
---
inlong-dashboard/src/locales/cn.json | 2 ++
inlong-dashboard/src/locales/en.json | 2 ++
.../src/metas/streams/common/StreamDefaultInfo.ts | 22 ++++++++++++++++++++++
.../manager/pojo/stream/InlongStreamInfo.java | 5 +++++
.../service/source/kafka/KafkaSourceOperator.java | 10 +++++-----
.../source/pulsar/PulsarSourceOperator.java | 10 +++++-----
6 files changed, 41 insertions(+), 10 deletions(-)
diff --git a/inlong-dashboard/src/locales/cn.json
b/inlong-dashboard/src/locales/cn.json
index 7f3dceedb..a540ae490 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -375,6 +375,8 @@
"meta.Stream.ExecuteWorkflow": "执行工作流",
"meta.Stream.ExecuteConfirm": "确认执行工作流吗?",
"meta.Stream.ExecuteSuccess": "执行成功",
+ "meta.Stream.WrapWithInlongMsg": "InlongMsg 打包",
+ "meta.Stream.WrapWithInlongMsgHelp": "消息体使用 InlongMsg 打包",
"meta.Consume.ConsumerGroupName": "消费组名称",
"meta.Consume.ConsumerGroupNameRules": "只能包含小写字母、数字、中划线、下划线",
"meta.Consume.TopicName": "Topic 名称",
diff --git a/inlong-dashboard/src/locales/en.json
b/inlong-dashboard/src/locales/en.json
index 12e448f9f..179cc2d37 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -375,6 +375,8 @@
"meta.Stream.ExecuteWorkflow": "ExecuteWorkflow",
"meta.Stream.ExecuteConfirm": "Are you sure to execute the workflow?",
"meta.Stream.ExecuteSuccess": "Execution Success",
+ "meta.Stream.WrapWithInlongMsg": "Wrap with InlongMsg",
+ "meta.Stream.WrapWithInlongMsgHelp": "The message body packaged with
InlongMsg",
"meta.Consume.ConsumerGroupName": "Consumer Group Name",
"meta.Consume.TopicName": "Topic Name",
"meta.Consume.MQType": "MQ Type",
diff --git a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
index 843c15b9a..1ecd4a9cc 100644
--- a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
+++ b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
@@ -188,6 +188,28 @@ export class StreamDefaultInfo implements DataWithBackend,
RenderRow, RenderList
@I18n('meta.Stream.DataSeparator')
dataSeparator: string;
+ @FieldDecorator({
+ type: 'radio',
+ isPro: true,
+ rules: [{ required: true }],
+ initialValue: 1,
+ tooltip: i18n.t('meta.Stream.WrapWithInlongMsgHelp'),
+ props: values => ({
+ options: [
+ {
+ label: i18n.t('basic.Yes'),
+ value: 1,
+ },
+ {
+ label: i18n.t('basic.No'),
+ value: 0,
+ },
+ ],
+ }),
+ })
+ @I18n('meta.Stream.WrapWithInlongMsg')
+ wrapWithInlongMsg: number;
+
@FieldDecorator({
type: EditableTable,
props: values => ({
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 499dbe9b7..c17d59bfa 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -42,6 +42,8 @@ import java.util.List;
@ApiModel("Inlong stream info")
public class InlongStreamInfo extends BaseInlongStream {
+ public static final int ENABLE_WRAP_WITH_INLONG_MSG = 1;
+
@ApiModelProperty(value = "Primary key")
private Integer id;
@@ -63,6 +65,9 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
private String dataType;
+ @ApiModelProperty(value = "Whether the message body wrapped with
InlongMsg, 0: no, 1: yes (as default)")
+ private Integer wrapWithInlongMsg;
+
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 2f2f64326..62a858da7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.source.kafka;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,8 +48,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-import static
org.apache.inlong.manager.common.consts.InlongConstants.DATA_TYPE_RAW_PREFIX;
-
/**
* kafka stream source operator
*/
@@ -118,9 +118,9 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
}
- // CSV: InLong message type whose message body is raw CSV
- // Raw-CSV: messages are separated by a specific separator
-
kafkaSource.setWrapWithInlongMsg(streamInfo.getDataType().startsWith(DATA_TYPE_RAW_PREFIX));
+ Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
+ kafkaSource.setWrapWithInlongMsg(
+ null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG
== wrapWithInlongMsg);
kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
kafkaSource.setFieldList(streamInfo.getFieldList());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index da0f4da73..57600f0a1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.source.pulsar;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -50,8 +52,6 @@ import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
-import static
org.apache.inlong.manager.common.consts.InlongConstants.DATA_TYPE_RAW_PREFIX;
-
/**
* Pulsar stream source operator
*/
@@ -130,9 +130,9 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
- // CSV: InLong message type whose message body is raw CSV
- // Raw-CSV: messages are separated by a specific separator
-
pulsarSource.setWrapWithInlongMsg(streamInfo.getDataType().startsWith(DATA_TYPE_RAW_PREFIX));
+ Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
+ pulsarSource.setWrapWithInlongMsg(
+ null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG
== wrapWithInlongMsg);
// set the token info
if (StringUtils.isNotBlank(pulsarCluster.getToken())) {