This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3c703dd1d3 [INLONG-10135][Manager] Move inlongCompressType to
clustertag configuration (#10137)
3c703dd1d3 is described below
commit 3c703dd1d3b9f2b887594313dd0f3ecbc99b895e
Author: fuweng11 <[email protected]>
AuthorDate: Wed May 8 10:06:18 2024 +0800
[INLONG-10135][Manager] Move inlongCompressType to clustertag configuration
(#10137)
---
.../manager/dao/mapper/ClusterSetMapper.java | 4 +
.../main/resources/mappers/ClusterSetMapper.xml | 6 ++
.../manager/pojo/cluster/ClusterTagRequest.java | 3 +
.../manager/pojo/cluster/ClusterTagResponse.java | 3 +
.../pojo/cluster/InlongClusterTagExtParam.java | 114 +++++++++++++++++++++
.../manager/pojo/stream/InlongStreamBriefInfo.java | 3 -
.../manager/pojo/stream/InlongStreamExtParam.java | 3 -
.../manager/pojo/stream/InlongStreamInfo.java | 3 -
.../manager/pojo/stream/InlongStreamRequest.java | 3 -
.../service/cluster/InlongClusterServiceImpl.java | 20 ++++
.../repository/DataProxyConfigRepository.java | 26 ++++-
.../repository/DataProxyConfigRepositoryV2.java | 33 +++++-
12 files changed, 203 insertions(+), 18 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
index 3c79e13875..acc82a4684 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
@@ -39,4 +40,7 @@ public interface ClusterSetMapper {
List<InlongGroupId> selectInlongGroupId();
List<InlongStreamId> selectInlongStreamId();
+
+ List<InlongClusterTagEntity> selectInlongClusterTag();
+
}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 04c787d3dd..f0fd616973 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -55,4 +55,10 @@
from inlong_stream
where is_deleted = 0
</select>
+ <select id="selectInlongClusterTag"
resultType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ select cluster_tag,
+ ext_params
+ from inlong_cluster_tag
+ where is_deleted = 0
+ </select>
</mapper>
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
index 9d9e425d50..e8592fea72 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
@@ -55,6 +55,9 @@ public class ClusterTagRequest {
@Length(min = 1, max = 163840, message = "length must be between 1 and
163840")
private String extParams;
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType;
+
@ApiModelProperty(value = "Description of the cluster tag")
@Length(max = 256, message = "length must be less than or equal to 256")
private String description;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
index c901a24fac..15528700df 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
@@ -50,6 +50,9 @@ public class ClusterTagResponse {
@ApiModelProperty(value = "Extended params")
private String extParams;
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType;
+
@ApiModelProperty(value = "Description of the cluster tag")
private String description;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/InlongClusterTagExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/InlongClusterTagExtParam.java
new file mode 100644
index 0000000000..4002312f2e
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/InlongClusterTagExtParam.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster;
+
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+
+/**
+ * Extended params, will be saved as JSON string
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@ApiModel("Inlong cluster tag ext param info")
+public class InlongClusterTagExtParam implements Serializable {
+
+ private static final Gson GSON = new Gson();
+
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType = "NONE";
+
+ /**
+ * Pack extended attributes into ExtParams
+ *
+ * @param request the request
+ * @return the packed extParams
+ */
+ public static String packExtParams(ClusterTagRequest request) {
+ InlongClusterTagExtParam extParam =
CommonBeanUtils.copyProperties(request, InlongClusterTagExtParam::new,
+ true);
+ JsonObject obj = GSON.fromJson(JsonUtils.toJsonString(extParam),
JsonObject.class);
+ if (StringUtils.isBlank(request.getExtParams())) {
+ return obj.toString();
+ }
+ JsonObject existObj = GSON.fromJson(request.getExtParams(),
JsonObject.class);
+ for (String key : obj.keySet()) {
+ JsonElement child = obj.get(key);
+ if (child.isJsonNull()) {
+ continue;
+ } else if (child.isJsonPrimitive()) {
+ JsonPrimitive jsonPrimitive = child.getAsJsonPrimitive();
+ if (jsonPrimitive.isBoolean()) {
+ existObj.addProperty(key, child.getAsBoolean());
+ } else if (jsonPrimitive.isNumber()) {
+ existObj.addProperty(key, child.getAsInt());
+ } else {
+ existObj.addProperty(key, child.getAsString());
+ }
+ } else {
+ existObj.addProperty(key, child.toString());
+ }
+ }
+ return existObj.toString();
+ }
+
+ /**
+ * Unpack extended attributes from {@link ClusterTagResponse}, will remove
target attributes from it.
+ *
+ * @param extParams the extParams value load from db
+ * @param targetObject the targetObject with to fill up
+ */
+ public static void unpackExtParams(
+ String extParams,
+ Object targetObject) {
+ if (StringUtils.isNotBlank(extParams)) {
+ InlongClusterTagExtParam inlongClusterTagExtParam =
+ JsonUtils.parseObject(extParams,
InlongClusterTagExtParam.class);
+ if (inlongClusterTagExtParam != null) {
+ CommonBeanUtils.copyProperties(inlongClusterTagExtParam,
targetObject, true);
+ }
+ }
+ }
+
+ /**
+ * Expand extParam filed, and fill in {@link ClusterTagResponse}
+ *
+ * @param clusterTagResponse the clusterTagResponse need to filled
+ */
+ public static void unpackExtParams(ClusterTagResponse clusterTagResponse) {
+ unpackExtParams(clusterTagResponse.getExtParams(), clusterTagResponse);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index 61cea87a8a..646bbfb59f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -85,9 +85,6 @@ public class InlongStreamBriefInfo {
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
- @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
- private String inlongCompressType;
-
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 0377b0d8f5..3358ab91eb 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -48,9 +48,6 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
- @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
- private String inlongCompressType = "NONE";
-
@ApiModelProperty(value = "Predefined fields")
private String predefinedFields;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 6fd8ce2d08..0de7a41181 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -133,9 +133,6 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
- @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
- private String inlongCompressType;
-
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index b0c915a1ca..41fdcd983b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,9 +121,6 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError;
- @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
- private String inlongCompressType;
-
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 3733318817..206102f1e1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -110,6 +110,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import static
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
+import static
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.unpackExtParams;
+
/**
* Inlong cluster service layer implementation
*/
@@ -170,6 +173,9 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
}
InlongClusterTagEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
+ request.setExtParams(entity.getExtParams());
+ String extParam = packExtParams(request);
+ entity.setExtParams(extParam);
entity.setCreator(operator);
entity.setModifier(operator);
clusterTagMapper.insert(entity);
@@ -196,6 +202,9 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
String.format("inlong cluster tag [%s] already exist",
request.getClusterTag()));
}
InlongClusterTagEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
+ request.setExtParams(entity.getExtParams());
+ String extParam = packExtParams(request);
+ entity.setExtParams(extParam);
entity.setCreator(opInfo.getName());
entity.setModifier(opInfo.getName());
clusterTagMapper.insert(entity);
@@ -212,6 +221,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
}
ClusterTagResponse response = CommonBeanUtils.copyProperties(entity,
ClusterTagResponse::new);
+ unpackExtParams(response);
List<String> tenantList = tenantClusterTagMapper
.selectByTag(entity.getClusterTag()).stream()
@@ -231,6 +241,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
String.format("inlong cluster tag not found by id=%s",
id));
}
ClusterTagResponse response = CommonBeanUtils.copyProperties(entity,
ClusterTagResponse::new);
+ unpackExtParams(response);
+
List<String> tenantList = tenantClusterTagMapper
.selectByTag(entity.getClusterTag()).stream()
.map(TenantClusterTagEntity::getTenant)
@@ -249,6 +261,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
PageResult<ClusterTagResponse> pageResult =
PageResult.fromPage(entityPage)
.map(entity -> {
ClusterTagResponse response =
CommonBeanUtils.copyProperties(entity, ClusterTagResponse::new);
+ unpackExtParams(response);
+
List<String> tenantList = tenantClusterTagMapper
.selectByTag(entity.getClusterTag()).stream()
.map(TenantClusterTagEntity::getTenant)
@@ -336,6 +350,9 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
}
CommonBeanUtils.copyProperties(request, exist, true);
+ request.setExtParams(exist.getExtParams());
+ String extParams = packExtParams(request);
+ exist.setExtParams(extParams);
exist.setModifier(operator);
if (InlongConstants.AFFECTED_ONE_ROW !=
clusterTagMapper.updateByIdSelective(exist)) {
LOGGER.error(errMsg);
@@ -427,6 +444,9 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
}
}
CommonBeanUtils.copyProperties(request, exist, true);
+ request.setExtParams(exist.getExtParams());
+ String extParams = packExtParams(request);
+ exist.setExtParams(extParams);
exist.setModifier(opInfo.getName());
if (InlongConstants.AFFECTED_ONE_ROW !=
clusterTagMapper.updateByIdSelective(exist)) {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 44245cb0c4..d7386dda69 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
@@ -99,6 +100,7 @@ public class DataProxyConfigRepository implements
IRepository {
public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
public static final String KEY_SORT_CONSUMER_GROUP =
"defaultSortConsumerGroup";
public static final String KEY_SINK_NAME = "defaultSinkName";
+ public static final String KEY_INLONG_COMPRESS_TYPE = "inlongCompressType";
public static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
@@ -372,6 +374,15 @@ public class DataProxyConfigRepository implements
IRepository {
Map<String, InlongStreamId> streamIdMap = new HashMap<>();
clusterSetMapper.selectInlongStreamId()
.forEach(v ->
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
+
+ Map<String, InlongClusterTagEntity> clusterTagMap = new HashMap<>();
+ clusterSetMapper.selectInlongClusterTag().forEach(v ->
clusterTagMap.put(v.getClusterTag(), v));
+ // reload inlong stream ext params
+ Map<String, Map<String, String>> clusterTagParams = new HashMap<>();
+ clusterTagMap.forEach((k, v) -> {
+ Map<String, String> params = fromJsonToMap(v.getExtParams());
+ clusterTagParams.put(k, params);
+ });
// reload inlong stream ext params
Map<String, Map<String, String>> streamParams = new HashMap<>();
streamIdMap.forEach((k, v) -> {
@@ -389,7 +400,7 @@ public class DataProxyConfigRepository implements
IRepository {
// build Map<clusterTag, List<InlongIdObject>>
Map<String, List<InLongIdObject>> inlongIdMap =
this.parseInlongId(groupIdMap, groupParams, streamIdMap,
- streamParams);
+ streamParams, clusterTagParams);
// mark inlong id to proxy cluster
for (Entry<String, DataProxyCluster> entry :
proxyClusterMap.entrySet()) {
String clusterTag =
entry.getValue().getProxyCluster().getSetName();
@@ -405,7 +416,7 @@ public class DataProxyConfigRepository implements
IRepository {
*/
private Map<String, List<InLongIdObject>> parseInlongId(Map<String,
InlongGroupId> groupIdMap,
Map<String, Map<String, String>> groupParams, Map<String,
InlongStreamId> streamIdMap,
- Map<String, Map<String, String>> streamParams) {
+ Map<String, Map<String, String>> streamParams, Map<String,
Map<String, String>> clusterTagParams) {
Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
InlongStreamId streamIdObj = entry.getValue();
@@ -427,6 +438,11 @@ public class DataProxyConfigRepository implements
IRepository {
obj.setTopic(streamIdObj.getTopic());
obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
}
+ Map<String, String> tagParamMap =
clusterTagParams.get(groupIdObj.getClusterTag());
+ if (tagParamMap != null &&
StringUtils.isNotBlank(tagParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+ obj.getParams().put(KEY_INLONG_COMPRESS_TYPE,
tagParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+ }
+
inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new
ArrayList<>()).add(obj);
// backup
InLongIdObject backupObj = new InLongIdObject();
@@ -445,6 +461,12 @@ public class DataProxyConfigRepository implements
IRepository {
} else {
backupObj.setTopic(groupMqResource);
}
+ Map<String, String> backUpTagParamMap =
clusterTagParams.get(groupIdObj.getClusterTag());
+ if (backUpTagParamMap != null
+ &&
StringUtils.isNotBlank(backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+ backupObj.getParams().put(KEY_INLONG_COMPRESS_TYPE,
+ backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+ }
inlongIdMap.computeIfAbsent(clusterTag, k -> new
ArrayList<>()).add(backupObj);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
index f9b5b6aa67..fda6e0810d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
@@ -95,6 +96,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
public static final String KEY_SORT_CONSUMER_GROUP =
"defaultSortConsumerGroup";
public static final String KEY_SINK_NAME = "defaultSinkName";
+ public static final String KEY_INLONG_COMPRESS_TYPE = "inlongCompressType";
public static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
@@ -258,8 +260,10 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
if (StringUtils.equalsIgnoreCase(producerTag,
Boolean.TRUE.toString()) && StringUtils.isNotBlank(
cacheCluster.getClusterTags())) {
Set<String> clusterTags =
Sets.newHashSet(cacheCluster.getClusterTags().split(InlongConstants.COMMA));
- clusterTags.forEach(clusterTag ->
cacheClusterMap.computeIfAbsent(clusterTag, k -> new HashMap<>())
- .computeIfAbsent(cacheCluster.getExtTag(), k -> new
ArrayList<>()).add(cacheCluster));
+ clusterTags.forEach(clusterTag -> {
+ cacheClusterMap.computeIfAbsent(clusterTag, k -> new
HashMap<>())
+ .computeIfAbsent(cacheCluster.getExtTag(), k ->
new ArrayList<>()).add(cacheCluster);
+ });
}
}
// mark cache cluster to proxy cluster
@@ -356,6 +360,15 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
.loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
groupExtCursor.forEach(v ->
groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
.put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
+
+ Map<String, InlongClusterTagEntity> clusterTagMap = new HashMap<>();
+ clusterSetMapper.selectInlongClusterTag().forEach(v ->
clusterTagMap.put(v.getClusterTag(), v));
+ // reload inlong stream ext params
+ Map<String, Map<String, String>> clusterTagParams = new HashMap<>();
+ clusterTagMap.forEach((k, v) -> {
+ Map<String, String> params = fromJsonToMap(v.getExtParams());
+ clusterTagParams.put(k, params);
+ });
// reload inlong stream id
Map<String, InlongStreamId> streamIdMap = new HashMap<>();
clusterSetMapper.selectInlongStreamId()
@@ -376,7 +389,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
// build Map<clusterTag, List<InlongIdObject>>
Map<String, List<InLongIdObject>> inlongIdMap =
this.parseInlongId(groupIdMap, groupParams, streamIdMap,
- streamParams);
+ streamParams, clusterTagParams);
// mark inlong id to proxy cluster
for (Entry<String, DataProxyCluster> entry :
proxyClusterMap.entrySet()) {
String clusterTag =
entry.getValue().getProxyCluster().getSetName();
@@ -392,7 +405,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
*/
private Map<String, List<InLongIdObject>> parseInlongId(Map<String,
InlongGroupId> groupIdMap,
Map<String, Map<String, String>> groupParams, Map<String,
InlongStreamId> streamIdMap,
- Map<String, Map<String, String>> streamParams) {
+ Map<String, Map<String, String>> streamParams, Map<String,
Map<String, String>> clusterTagParams) {
Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
InlongStreamId streamIdObj = entry.getValue();
@@ -408,12 +421,18 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
obj.setInlongId(inlongId);
Optional.ofNullable(groupParams.get(groupId)).ifPresent(v ->
obj.getParams().putAll(v));
Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v ->
obj.getParams().putAll(v));
+
if (StringUtils.isBlank(streamIdObj.getTopic())) {
obj.setTopic(groupIdObj.getTopic());
} else {
obj.setTopic(streamIdObj.getTopic());
obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
}
+ Map<String, String> clusterParamMap =
clusterTagParams.get(groupIdObj.getClusterTag());
+ if (clusterParamMap != null &&
StringUtils.isNotBlank(clusterParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+ obj.getParams().put(KEY_INLONG_COMPRESS_TYPE,
clusterParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+ }
+
inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new
ArrayList<>()).add(obj);
// backup
InLongIdObject backupObj = new InLongIdObject();
@@ -432,6 +451,12 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
} else {
backupObj.setTopic(groupMqResource);
}
+ Map<String, String> backUpTagParamMap =
clusterTagParams.get(groupIdObj.getClusterTag());
+ if (backUpTagParamMap != null
+ &&
StringUtils.isNotBlank(backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+ backupObj.getParams().put(KEY_INLONG_COMPRESS_TYPE,
+ backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+ }
inlongIdMap.computeIfAbsent(clusterTag, k -> new
ArrayList<>()).add(backupObj);
}
}