This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f802afa684 [INLONG-9003][DataProxy][Manager] Support extended fields
(#9004)
f802afa684 is described below
commit f802afa68468c3a7202e1082d2232217aa582936
Author: vernedeng <[email protected]>
AuthorDate: Sat Oct 7 14:29:26 2023 +0800
[INLONG-9003][DataProxy][Manager] Support extended fields (#9004)
---
.../apache/inlong/dataproxy/config/holder/MetaConfigHolder.java | 2 ++
.../java/org/apache/inlong/dataproxy/config/pojo/DataType.java | 2 +-
.../org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java | 9 +++++++++
inlong-manager/manager-dao/pom.xml | 8 --------
.../manager-dao/src/main/resources/mappers/ClusterSetMapper.xml | 1 +
.../org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java | 9 +++++++++
.../apache/inlong/manager/pojo/stream/InlongStreamExtParam.java | 3 +++
.../org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java | 3 +++
.../apache/inlong/manager/pojo/stream/InlongStreamRequest.java | 3 +++
.../manager/service/repository/DataProxyConfigRepository.java | 7 ++++++-
.../manager/service/repository/DataProxyConfigRepositoryV2.java | 7 ++++++-
11 files changed, 43 insertions(+), 11 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 3c1edbb6d6..369e7b2212 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -411,6 +411,8 @@ public class MetaConfigHolder extends ConfigHolder {
idObject.getParams().getOrDefault("dataType",
DataType.TEXT.value())));
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter",
"|"));
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter",
"\n"));
+ tmpConfig.setUseExtendedFields(Boolean.valueOf(
+ idObject.getParams().getOrDefault("useExtendedFields",
"false")));
tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig);
if (mqType.equals(CacheType.TUBE)
&& !tmpConfig.getUid().equals(tmpConfig.getInlongGroupId())
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
index b217856c3c..18491e7343 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.dataproxy.config.pojo;
*/
public enum DataType {
- TEXT("text"), PB("pb"), JCE("jce"), N("n");
+ TEXT("text"), PB("pb"), JCE("jce"), N("n"), CSV("csv"), KV("kv");
private final String value;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index 95aea2a78a..31646fd3b5 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -40,6 +40,7 @@ public class IdTopicConfig {
private DataType dataType = DataType.TEXT;
private String fieldDelimiter = "|";
private String fileDelimiter = "\n";
+ private Boolean useExtendedFields = false;
private Map<String, String> params = new HashMap<>();
@@ -47,6 +48,14 @@ public class IdTopicConfig {
}
+ public Boolean getUseExtendedFields() {
+ return useExtendedFields;
+ }
+
+ public void setUseExtendedFields(Boolean useExtendedFields) {
+ this.useExtendedFields = useExtendedFields;
+ }
+
/**
* get uid
* @return the uid
diff --git a/inlong-manager/manager-dao/pom.xml
b/inlong-manager/manager-dao/pom.xml
index 7848b6dd26..18e4dd06d4 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -65,14 +65,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</exclusion>
- <exclusion>
- <groupId>com.sun</groupId>
- <artifactId>tools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun</groupId>
- <artifactId>jconsole</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index b3c08fb457..f22e23a231 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -48,6 +48,7 @@
<select id="selectInlongStreamId"
resultType="org.apache.inlong.manager.pojo.dataproxy.InlongStreamId">
select inlong_group_id,
inlong_stream_id,
+ dataType,
mq_resource as topic,
ext_params
from inlong_stream
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
index 0ac7baf837..2d329c3426 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
@@ -25,8 +25,17 @@ public class InlongStreamId {
private String inlongGroupId;
private String inlongStreamId;
private String topic;
+ private String dataType;
private String extParams;
+ public String getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(String dataType) {
+ this.dataType = dataType;
+ }
+
/**
* get inlongGroupId
*
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 1e80e0bd7f..0dba8034af 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -45,6 +45,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private boolean ignoreParseError;
+ @ApiModelProperty(value = "If use extended fields")
+ private Boolean useExtendedFields = false;
+
/**
* Pack extended attributes into ExtParams
*
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 1a579d4960..f1c305c475 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -133,6 +133,9 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
+ @ApiModelProperty(value = "If use extended fields")
+ private Boolean useExtendedFields = false;
+
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError = true;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index e2944329f4..4ad91f17af 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,6 +121,9 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private boolean ignoreParseError = true;
+ @ApiModelProperty(value = "If use extended fields")
+ private Boolean useExtendedFields = false;
+
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
private String wrapType;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index de793b63bd..a0a38facfe 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -91,6 +91,7 @@ public class DataProxyConfigRepository implements IRepository
{
public static final String KEY_NAMESPACE = "namespace";
public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
public static final String KEY_OLD_TENANT_KEY = "tenant";
+ public static final String KEY_DATA_TYPE = "dataType";
public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
public static final String KEY_BACKUP_TOPIC = "backup_topic";
public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -372,7 +373,11 @@ public class DataProxyConfigRepository implements
IRepository {
.forEach(v ->
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
// reload inlong stream ext params
Map<String, Map<String, String>> streamParams = new HashMap<>();
- streamIdMap.forEach((k, v) -> streamParams.put(k,
fromJsonToMap(v.getExtParams())));
+ streamIdMap.forEach((k, v) -> {
+ Map<String, String> params = fromJsonToMap(v.getExtParams());
+ params.computeIfAbsent(KEY_DATA_TYPE, type -> v.getDataType());
+ streamParams.put(k, params);
+ });
// reload inlong stream ext
List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
index d501e62f96..e169efc9b7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -90,6 +90,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
public static final String KEY_NAMESPACE = "namespace";
public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
public static final String KEY_BACKUP_TOPIC = "backup_topic";
+ public static final String KEY_DATA_TYPE = "dataType";
public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
public static final String KEY_SORT_CONSUMER_GROUP =
"defaultSortConsumerGroup";
@@ -363,7 +364,11 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
.forEach(v ->
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
// reload inlong stream ext params
Map<String, Map<String, String>> streamParams = new HashMap<>();
- streamIdMap.forEach((k, v) -> streamParams.put(k,
fromJsonToMap(v.getExtParams())));
+ streamIdMap.forEach((k, v) -> {
+ Map<String, String> params = fromJsonToMap(v.getExtParams());
+ params.computeIfAbsent(KEY_DATA_TYPE, type -> v.getDataType());
+ streamParams.put(k, params);
+ });
// reload inlong stream ext
List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);