This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f802afa684 [INLONG-9003][DataProxy][Manager] Support extended fields 
(#9004)
f802afa684 is described below

commit f802afa68468c3a7202e1082d2232217aa582936
Author: vernedeng <[email protected]>
AuthorDate: Sat Oct 7 14:29:26 2023 +0800

    [INLONG-9003][DataProxy][Manager] Support extended fields (#9004)
---
 .../apache/inlong/dataproxy/config/holder/MetaConfigHolder.java  | 2 ++
 .../java/org/apache/inlong/dataproxy/config/pojo/DataType.java   | 2 +-
 .../org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java   | 9 +++++++++
 inlong-manager/manager-dao/pom.xml                               | 8 --------
 .../manager-dao/src/main/resources/mappers/ClusterSetMapper.xml  | 1 +
 .../org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java | 9 +++++++++
 .../apache/inlong/manager/pojo/stream/InlongStreamExtParam.java  | 3 +++
 .../org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java  | 3 +++
 .../apache/inlong/manager/pojo/stream/InlongStreamRequest.java   | 3 +++
 .../manager/service/repository/DataProxyConfigRepository.java    | 7 ++++++-
 .../manager/service/repository/DataProxyConfigRepositoryV2.java  | 7 ++++++-
 11 files changed, 43 insertions(+), 11 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 3c1edbb6d6..369e7b2212 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -411,6 +411,8 @@ public class MetaConfigHolder extends ConfigHolder {
                     idObject.getParams().getOrDefault("dataType", 
DataType.TEXT.value())));
             
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", 
"|"));
             
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", 
"\n"));
+            tmpConfig.setUseExtendedFields(Boolean.valueOf(
+                    idObject.getParams().getOrDefault("useExtendedFields", 
"false")));
             tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig);
             if (mqType.equals(CacheType.TUBE)
                     && !tmpConfig.getUid().equals(tmpConfig.getInlongGroupId())
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
index b217856c3c..18491e7343 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.dataproxy.config.pojo;
  */
 public enum DataType {
 
-    TEXT("text"), PB("pb"), JCE("jce"), N("n");
+    TEXT("text"), PB("pb"), JCE("jce"), N("n"), CSV("csv"), KV("kv");
 
     private final String value;
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index 95aea2a78a..31646fd3b5 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -40,6 +40,7 @@ public class IdTopicConfig {
     private DataType dataType = DataType.TEXT;
     private String fieldDelimiter = "|";
     private String fileDelimiter = "\n";
+    private Boolean useExtendedFields = false;
 
     private Map<String, String> params = new HashMap<>();
 
@@ -47,6 +48,14 @@ public class IdTopicConfig {
 
     }
 
+    public Boolean getUseExtendedFields() {
+        return useExtendedFields;
+    }
+
+    public void setUseExtendedFields(Boolean useExtendedFields) {
+        this.useExtendedFields = useExtendedFields;
+    }
+
     /**
      * get uid
      * @return the uid
diff --git a/inlong-manager/manager-dao/pom.xml 
b/inlong-manager/manager-dao/pom.xml
index 7848b6dd26..18e4dd06d4 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -65,14 +65,6 @@
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-autoconfigure</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>com.sun</groupId>
-                    <artifactId>tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun</groupId>
-                    <artifactId>jconsole</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index b3c08fb457..f22e23a231 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -48,6 +48,7 @@
     <select id="selectInlongStreamId" 
resultType="org.apache.inlong.manager.pojo.dataproxy.InlongStreamId">
         select inlong_group_id,
                inlong_stream_id,
+               dataType,
                mq_resource as topic,
                ext_params
         from inlong_stream
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
index 0ac7baf837..2d329c3426 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
@@ -25,8 +25,17 @@ public class InlongStreamId {
     private String inlongGroupId;
     private String inlongStreamId;
     private String topic;
+    private String dataType;
     private String extParams;
 
+    public String getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(String dataType) {
+        this.dataType = dataType;
+    }
+
     /**
      * get inlongGroupId
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 1e80e0bd7f..0dba8034af 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -45,6 +45,9 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private boolean ignoreParseError;
 
+    @ApiModelProperty(value = "If use extended fields")
+    private Boolean useExtendedFields = false;
+
     /**
      * Pack extended attributes into ExtParams
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 1a579d4960..f1c305c475 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -133,6 +133,9 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "The message body wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 
+    @ApiModelProperty(value = "If use extended fields")
+    private Boolean useExtendedFields = false;
+
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError = true;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index e2944329f4..4ad91f17af 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,6 +121,9 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private boolean ignoreParseError = true;
 
+    @ApiModelProperty(value = "If use extended fields")
+    private Boolean useExtendedFields = false;
+
     @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
     private String wrapType;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index de793b63bd..a0a38facfe 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -91,6 +91,7 @@ public class DataProxyConfigRepository implements IRepository 
{
     public static final String KEY_NAMESPACE = "namespace";
     public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
     public static final String KEY_OLD_TENANT_KEY = "tenant";
+    public static final String KEY_DATA_TYPE = "dataType";
     public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -372,7 +373,11 @@ public class DataProxyConfigRepository implements 
IRepository {
                 .forEach(v -> 
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
         // reload inlong stream ext params
         Map<String, Map<String, String>> streamParams = new HashMap<>();
-        streamIdMap.forEach((k, v) -> streamParams.put(k, 
fromJsonToMap(v.getExtParams())));
+        streamIdMap.forEach((k, v) -> {
+            Map<String, String> params = fromJsonToMap(v.getExtParams());
+            params.computeIfAbsent(KEY_DATA_TYPE, type -> v.getDataType());
+            streamParams.put(k, params);
+        });
         // reload inlong stream ext
         List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
                 .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
index d501e62f96..e169efc9b7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -90,6 +90,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
     public static final String KEY_NAMESPACE = "namespace";
     public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
+    public static final String KEY_DATA_TYPE = "dataType";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
     public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
     public static final String KEY_SORT_CONSUMER_GROUP = 
"defaultSortConsumerGroup";
@@ -363,7 +364,11 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
                 .forEach(v -> 
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
         // reload inlong stream ext params
         Map<String, Map<String, String>> streamParams = new HashMap<>();
-        streamIdMap.forEach((k, v) -> streamParams.put(k, 
fromJsonToMap(v.getExtParams())));
+        streamIdMap.forEach((k, v) -> {
+            Map<String, String> params = fromJsonToMap(v.getExtParams());
+            params.computeIfAbsent(KEY_DATA_TYPE, type -> v.getDataType());
+            streamParams.put(k, params);
+        });
         // reload inlong stream ext
         List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
                 .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);

Reply via email to