This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a9d7deb51 [INLONG-6961][Dashboard][Manager] Support for configing 
non-InlongMsg formats (#6969)
a9d7deb51 is described below

commit a9d7deb513755eaa38cb1b21cdc7d54ff2534c10
Author: featzhang <[email protected]>
AuthorDate: Thu Dec 22 11:22:01 2022 +0800

    [INLONG-6961][Dashboard][Manager] Support for configing non-InlongMsg 
formats (#6969)
---
 inlong-dashboard/src/locales/cn.json               |  2 ++
 inlong-dashboard/src/locales/en.json               |  2 ++
 .../src/metas/streams/common/StreamDefaultInfo.ts  | 22 ++++++++++++++++++++++
 .../manager/pojo/stream/InlongStreamInfo.java      |  5 +++++
 .../service/source/kafka/KafkaSourceOperator.java  | 10 +++++-----
 .../source/pulsar/PulsarSourceOperator.java        | 10 +++++-----
 6 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json 
b/inlong-dashboard/src/locales/cn.json
index 7f3dceedb..a540ae490 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -375,6 +375,8 @@
   "meta.Stream.ExecuteWorkflow": "执行工作流",
   "meta.Stream.ExecuteConfirm": "确认执行工作流吗?",
   "meta.Stream.ExecuteSuccess": "执行成功",
+  "meta.Stream.WrapWithInlongMsg": "InlongMsg 打包",
+  "meta.Stream.WrapWithInlongMsgHelp": "消息体使用 InlongMsg 打包",
   "meta.Consume.ConsumerGroupName": "消费组名称",
   "meta.Consume.ConsumerGroupNameRules": "只能包含小写字母、数字、中划线、下划线",
   "meta.Consume.TopicName": "Topic 名称",
diff --git a/inlong-dashboard/src/locales/en.json 
b/inlong-dashboard/src/locales/en.json
index 12e448f9f..179cc2d37 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -375,6 +375,8 @@
   "meta.Stream.ExecuteWorkflow": "ExecuteWorkflow",
   "meta.Stream.ExecuteConfirm": "Are you sure to execute the workflow?",
   "meta.Stream.ExecuteSuccess": "Execution Success",
+  "meta.Stream.WrapWithInlongMsg": "Wrap with InlongMsg",
+  "meta.Stream.WrapWithInlongMsgHelp": "The message body packaged with 
InlongMsg",
   "meta.Consume.ConsumerGroupName": "Consumer Group Name",
   "meta.Consume.TopicName": "Topic Name",
   "meta.Consume.MQType": "MQ Type",
diff --git a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts 
b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
index 843c15b9a..1ecd4a9cc 100644
--- a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
+++ b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
@@ -188,6 +188,28 @@ export class StreamDefaultInfo implements DataWithBackend, 
RenderRow, RenderList
   @I18n('meta.Stream.DataSeparator')
   dataSeparator: string;
 
+  @FieldDecorator({
+    type: 'radio',
+    isPro: true,
+    rules: [{ required: true }],
+    initialValue: 1,
+    tooltip: i18n.t('meta.Stream.WrapWithInlongMsgHelp'),
+    props: values => ({
+      options: [
+        {
+          label: i18n.t('basic.Yes'),
+          value: 1,
+        },
+        {
+          label: i18n.t('basic.No'),
+          value: 0,
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Stream.WrapWithInlongMsg')
+  wrapWithInlongMsg: number;
+
   @FieldDecorator({
     type: EditableTable,
     props: values => ({
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 499dbe9b7..c17d59bfa 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -42,6 +42,8 @@ import java.util.List;
 @ApiModel("Inlong stream info")
 public class InlongStreamInfo extends BaseInlongStream {
 
+    public static final int ENABLE_WRAP_WITH_INLONG_MSG = 1;
+
     @ApiModelProperty(value = "Primary key")
     private Integer id;
 
@@ -63,6 +65,9 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
     private String dataType;
 
+    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg, 0: no, 1: yes (as default)")
+    private Integer wrapWithInlongMsg;
+
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
     private String dataEncoding;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 2f2f64326..62a858da7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.source.kafka;
 
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -46,8 +48,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static 
org.apache.inlong.manager.common.consts.InlongConstants.DATA_TYPE_RAW_PREFIX;
-
 /**
  * kafka stream source operator
  */
@@ -118,9 +118,9 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
                 
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
             }
 
-            // CSV: InLong message type whose message body is raw CSV
-            // Raw-CSV: messages are separated by a specific separator
-            
kafkaSource.setWrapWithInlongMsg(streamInfo.getDataType().startsWith(DATA_TYPE_RAW_PREFIX));
+            Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
+            kafkaSource.setWrapWithInlongMsg(
+                    null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG 
== wrapWithInlongMsg);
 
             kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
             kafkaSource.setFieldList(streamInfo.getFieldList());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index da0f4da73..57600f0a1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.source.pulsar;
 
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamInfo.ENABLE_WRAP_WITH_INLONG_MSG;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
@@ -50,8 +52,6 @@ import org.springframework.stereotype.Service;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.inlong.manager.common.consts.InlongConstants.DATA_TYPE_RAW_PREFIX;
-
 /**
  * Pulsar stream source operator
  */
@@ -130,9 +130,9 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
             pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
 
-            // CSV: InLong message type whose message body is raw CSV
-            // Raw-CSV: messages are separated by a specific separator
-            
pulsarSource.setWrapWithInlongMsg(streamInfo.getDataType().startsWith(DATA_TYPE_RAW_PREFIX));
+            Integer wrapWithInlongMsg = streamInfo.getWrapWithInlongMsg();
+            pulsarSource.setWrapWithInlongMsg(
+                    null == wrapWithInlongMsg || ENABLE_WRAP_WITH_INLONG_MSG 
== wrapWithInlongMsg);
 
             // set the token info
             if (StringUtils.isNotBlank(pulsarCluster.getToken())) {

Reply via email to