This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c703dd1d3 [INLONG-10135][Manager] Move inlongCompressType to 
clustertag configuration (#10137)
3c703dd1d3 is described below

commit 3c703dd1d3b9f2b887594313dd0f3ecbc99b895e
Author: fuweng11 <[email protected]>
AuthorDate: Wed May 8 10:06:18 2024 +0800

    [INLONG-10135][Manager] Move inlongCompressType to clustertag configuration 
(#10137)
---
 .../manager/dao/mapper/ClusterSetMapper.java       |   4 +
 .../main/resources/mappers/ClusterSetMapper.xml    |   6 ++
 .../manager/pojo/cluster/ClusterTagRequest.java    |   3 +
 .../manager/pojo/cluster/ClusterTagResponse.java   |   3 +
 .../pojo/cluster/InlongClusterTagExtParam.java     | 114 +++++++++++++++++++++
 .../manager/pojo/stream/InlongStreamBriefInfo.java |   3 -
 .../manager/pojo/stream/InlongStreamExtParam.java  |   3 -
 .../manager/pojo/stream/InlongStreamInfo.java      |   3 -
 .../manager/pojo/stream/InlongStreamRequest.java   |   3 -
 .../service/cluster/InlongClusterServiceImpl.java  |  20 ++++
 .../repository/DataProxyConfigRepository.java      |  26 ++++-
 .../repository/DataProxyConfigRepositoryV2.java    |  33 +++++-
 12 files changed, 203 insertions(+), 18 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
index 3c79e13875..acc82a4684 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
 import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
 import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
 import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
@@ -39,4 +40,7 @@ public interface ClusterSetMapper {
     List<InlongGroupId> selectInlongGroupId();
 
     List<InlongStreamId> selectInlongStreamId();
+
+    List<InlongClusterTagEntity> selectInlongClusterTag();
+
 }
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 04c787d3dd..f0fd616973 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -55,4 +55,10 @@
         from inlong_stream
         where is_deleted = 0
     </select>
+    <select id="selectInlongClusterTag" 
resultType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        select cluster_tag,
+               ext_params
+        from inlong_cluster_tag
+        where is_deleted = 0
+    </select>
 </mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
index 9d9e425d50..e8592fea72 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
@@ -55,6 +55,9 @@ public class ClusterTagRequest {
     @Length(min = 1, max = 163840, message = "length must be between 1 and 
163840")
     private String extParams;
 
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType;
+
     @ApiModelProperty(value = "Description of the cluster tag")
     @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
index c901a24fac..15528700df 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagResponse.java
@@ -50,6 +50,9 @@ public class ClusterTagResponse {
     @ApiModelProperty(value = "Extended params")
     private String extParams;
 
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType;
+
     @ApiModelProperty(value = "Description of the cluster tag")
     private String description;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/InlongClusterTagExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/InlongClusterTagExtParam.java
new file mode 100644
index 0000000000..4002312f2e
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/InlongClusterTagExtParam.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster;
+
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+
+/**
+ * Extended params, will be saved as JSON string
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@ApiModel("Inlong cluster tag ext param info")
+public class InlongClusterTagExtParam implements Serializable {
+
+    private static final Gson GSON = new Gson();
+
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType = "NONE";
+
+    /**
+     * Pack extended attributes into ExtParams
+     *
+     * @param request the request
+     * @return the packed extParams
+     */
+    public static String packExtParams(ClusterTagRequest request) {
+        InlongClusterTagExtParam extParam = 
CommonBeanUtils.copyProperties(request, InlongClusterTagExtParam::new,
+                true);
+        JsonObject obj = GSON.fromJson(JsonUtils.toJsonString(extParam), 
JsonObject.class);
+        if (StringUtils.isBlank(request.getExtParams())) {
+            return obj.toString();
+        }
+        JsonObject existObj = GSON.fromJson(request.getExtParams(), 
JsonObject.class);
+        for (String key : obj.keySet()) {
+            JsonElement child = obj.get(key);
+            if (child.isJsonNull()) {
+                continue;
+            } else if (child.isJsonPrimitive()) {
+                JsonPrimitive jsonPrimitive = child.getAsJsonPrimitive();
+                if (jsonPrimitive.isBoolean()) {
+                    existObj.addProperty(key, child.getAsBoolean());
+                } else if (jsonPrimitive.isNumber()) {
+                    existObj.addProperty(key, child.getAsInt());
+                } else {
+                    existObj.addProperty(key, child.getAsString());
+                }
+            } else {
+                existObj.addProperty(key, child.toString());
+            }
+        }
+        return existObj.toString();
+    }
+
+    /**
+     * Unpack extended attributes from {@link ClusterTagResponse}, will remove 
target attributes from it.
+     *
+     * @param extParams the extParams value load from db
+     * @param targetObject the targetObject with to fill up
+     */
+    public static void unpackExtParams(
+            String extParams,
+            Object targetObject) {
+        if (StringUtils.isNotBlank(extParams)) {
+            InlongClusterTagExtParam inlongClusterTagExtParam =
+                    JsonUtils.parseObject(extParams, 
InlongClusterTagExtParam.class);
+            if (inlongClusterTagExtParam != null) {
+                CommonBeanUtils.copyProperties(inlongClusterTagExtParam, 
targetObject, true);
+            }
+        }
+    }
+
+    /**
+     * Expand extParam filed, and fill in {@link ClusterTagResponse}
+     *
+     * @param clusterTagResponse the clusterTagResponse need to filled
+     */
+    public static void unpackExtParams(ClusterTagResponse clusterTagResponse) {
+        unpackExtParams(clusterTagResponse.getExtParams(), clusterTagResponse);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index 61cea87a8a..646bbfb59f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -85,9 +85,6 @@ public class InlongStreamBriefInfo {
     @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 
-    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
-    private String inlongCompressType;
-
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 0377b0d8f5..3358ab91eb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -48,9 +48,6 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
-    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
-    private String inlongCompressType = "NONE";
-
     @ApiModelProperty(value = "Predefined fields")
     private String predefinedFields;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 6fd8ce2d08..0de7a41181 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -133,9 +133,6 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "The message body wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 
-    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
-    private String inlongCompressType;
-
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index b0c915a1ca..41fdcd983b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,9 +121,6 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError;
 
-    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
-    private String inlongCompressType;
-
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 3733318817..206102f1e1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -110,6 +110,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
+import static 
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.unpackExtParams;
+
 /**
  * Inlong cluster service layer implementation
  */
@@ -170,6 +173,9 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         }
 
         InlongClusterTagEntity entity = 
CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
+        request.setExtParams(entity.getExtParams());
+        String extParam = packExtParams(request);
+        entity.setExtParams(extParam);
         entity.setCreator(operator);
         entity.setModifier(operator);
         clusterTagMapper.insert(entity);
@@ -196,6 +202,9 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
                     String.format("inlong cluster tag [%s] already exist", 
request.getClusterTag()));
         }
         InlongClusterTagEntity entity = 
CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
+        request.setExtParams(entity.getExtParams());
+        String extParam = packExtParams(request);
+        entity.setExtParams(extParam);
         entity.setCreator(opInfo.getName());
         entity.setModifier(opInfo.getName());
         clusterTagMapper.insert(entity);
@@ -212,6 +221,7 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         }
 
         ClusterTagResponse response = CommonBeanUtils.copyProperties(entity, 
ClusterTagResponse::new);
+        unpackExtParams(response);
 
         List<String> tenantList = tenantClusterTagMapper
                 .selectByTag(entity.getClusterTag()).stream()
@@ -231,6 +241,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
                     String.format("inlong cluster tag not found by id=%s", 
id));
         }
         ClusterTagResponse response = CommonBeanUtils.copyProperties(entity, 
ClusterTagResponse::new);
+        unpackExtParams(response);
+
         List<String> tenantList = tenantClusterTagMapper
                 .selectByTag(entity.getClusterTag()).stream()
                 .map(TenantClusterTagEntity::getTenant)
@@ -249,6 +261,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         PageResult<ClusterTagResponse> pageResult = 
PageResult.fromPage(entityPage)
                 .map(entity -> {
                     ClusterTagResponse response = 
CommonBeanUtils.copyProperties(entity, ClusterTagResponse::new);
+                    unpackExtParams(response);
+
                     List<String> tenantList = tenantClusterTagMapper
                             .selectByTag(entity.getClusterTag()).stream()
                             .map(TenantClusterTagEntity::getTenant)
@@ -336,6 +350,9 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         }
 
         CommonBeanUtils.copyProperties(request, exist, true);
+        request.setExtParams(exist.getExtParams());
+        String extParams = packExtParams(request);
+        exist.setExtParams(extParams);
         exist.setModifier(operator);
         if (InlongConstants.AFFECTED_ONE_ROW != 
clusterTagMapper.updateByIdSelective(exist)) {
             LOGGER.error(errMsg);
@@ -427,6 +444,9 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
             }
         }
         CommonBeanUtils.copyProperties(request, exist, true);
+        request.setExtParams(exist.getExtParams());
+        String extParams = packExtParams(request);
+        exist.setExtParams(extParams);
         exist.setModifier(opInfo.getName());
         if (InlongConstants.AFFECTED_ONE_ROW != 
clusterTagMapper.updateByIdSelective(exist)) {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 44245cb0c4..d7386dda69 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
@@ -99,6 +100,7 @@ public class DataProxyConfigRepository implements 
IRepository {
     public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
     public static final String KEY_SORT_CONSUMER_GROUP = 
"defaultSortConsumerGroup";
     public static final String KEY_SINK_NAME = "defaultSinkName";
+    public static final String KEY_INLONG_COMPRESS_TYPE = "inlongCompressType";
 
     public static final Splitter.MapSplitter MAP_SPLITTER = 
Splitter.on(SEPARATOR).trimResults()
             .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
@@ -372,6 +374,15 @@ public class DataProxyConfigRepository implements 
IRepository {
         Map<String, InlongStreamId> streamIdMap = new HashMap<>();
         clusterSetMapper.selectInlongStreamId()
                 .forEach(v -> 
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
+
+        Map<String, InlongClusterTagEntity> clusterTagMap = new HashMap<>();
+        clusterSetMapper.selectInlongClusterTag().forEach(v -> 
clusterTagMap.put(v.getClusterTag(), v));
+        // reload inlong stream ext params
+        Map<String, Map<String, String>> clusterTagParams = new HashMap<>();
+        clusterTagMap.forEach((k, v) -> {
+            Map<String, String> params = fromJsonToMap(v.getExtParams());
+            clusterTagParams.put(k, params);
+        });
         // reload inlong stream ext params
         Map<String, Map<String, String>> streamParams = new HashMap<>();
         streamIdMap.forEach((k, v) -> {
@@ -389,7 +400,7 @@ public class DataProxyConfigRepository implements 
IRepository {
 
         // build Map<clusterTag, List<InlongIdObject>>
         Map<String, List<InLongIdObject>> inlongIdMap = 
this.parseInlongId(groupIdMap, groupParams, streamIdMap,
-                streamParams);
+                streamParams, clusterTagParams);
         // mark inlong id to proxy cluster
         for (Entry<String, DataProxyCluster> entry : 
proxyClusterMap.entrySet()) {
             String clusterTag = 
entry.getValue().getProxyCluster().getSetName();
@@ -405,7 +416,7 @@ public class DataProxyConfigRepository implements 
IRepository {
      */
     private Map<String, List<InLongIdObject>> parseInlongId(Map<String, 
InlongGroupId> groupIdMap,
             Map<String, Map<String, String>> groupParams, Map<String, 
InlongStreamId> streamIdMap,
-            Map<String, Map<String, String>> streamParams) {
+            Map<String, Map<String, String>> streamParams, Map<String, 
Map<String, String>> clusterTagParams) {
         Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
         for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
             InlongStreamId streamIdObj = entry.getValue();
@@ -427,6 +438,11 @@ public class DataProxyConfigRepository implements 
IRepository {
                 obj.setTopic(streamIdObj.getTopic());
                 obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
             }
+            Map<String, String> tagParamMap = 
clusterTagParams.get(groupIdObj.getClusterTag());
+            if (tagParamMap != null && 
StringUtils.isNotBlank(tagParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+                obj.getParams().put(KEY_INLONG_COMPRESS_TYPE, 
tagParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+            }
+
             inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new 
ArrayList<>()).add(obj);
             // backup
             InLongIdObject backupObj = new InLongIdObject();
@@ -445,6 +461,12 @@ public class DataProxyConfigRepository implements 
IRepository {
                 } else {
                     backupObj.setTopic(groupMqResource);
                 }
+                Map<String, String> backUpTagParamMap = 
clusterTagParams.get(groupIdObj.getClusterTag());
+                if (backUpTagParamMap != null
+                        && 
StringUtils.isNotBlank(backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+                    backupObj.getParams().put(KEY_INLONG_COMPRESS_TYPE,
+                            backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+                }
                 inlongIdMap.computeIfAbsent(clusterTag, k -> new 
ArrayList<>()).add(backupObj);
             }
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
index f9b5b6aa67..fda6e0810d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
@@ -95,6 +96,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
     public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
     public static final String KEY_SORT_CONSUMER_GROUP = 
"defaultSortConsumerGroup";
     public static final String KEY_SINK_NAME = "defaultSinkName";
+    public static final String KEY_INLONG_COMPRESS_TYPE = "inlongCompressType";
 
     public static final Splitter.MapSplitter MAP_SPLITTER = 
Splitter.on(SEPARATOR).trimResults()
             .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
@@ -258,8 +260,10 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
             if (StringUtils.equalsIgnoreCase(producerTag, 
Boolean.TRUE.toString()) && StringUtils.isNotBlank(
                     cacheCluster.getClusterTags())) {
                 Set<String> clusterTags = 
Sets.newHashSet(cacheCluster.getClusterTags().split(InlongConstants.COMMA));
-                clusterTags.forEach(clusterTag -> 
cacheClusterMap.computeIfAbsent(clusterTag, k -> new HashMap<>())
-                        .computeIfAbsent(cacheCluster.getExtTag(), k -> new 
ArrayList<>()).add(cacheCluster));
+                clusterTags.forEach(clusterTag -> {
+                    cacheClusterMap.computeIfAbsent(clusterTag, k -> new 
HashMap<>())
+                            .computeIfAbsent(cacheCluster.getExtTag(), k -> 
new ArrayList<>()).add(cacheCluster);
+                });
             }
         }
         // mark cache cluster to proxy cluster
@@ -356,6 +360,15 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
                 .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
         groupExtCursor.forEach(v -> 
groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
                 .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
+
+        Map<String, InlongClusterTagEntity> clusterTagMap = new HashMap<>();
+        clusterSetMapper.selectInlongClusterTag().forEach(v -> 
clusterTagMap.put(v.getClusterTag(), v));
+        // reload inlong stream ext params
+        Map<String, Map<String, String>> clusterTagParams = new HashMap<>();
+        clusterTagMap.forEach((k, v) -> {
+            Map<String, String> params = fromJsonToMap(v.getExtParams());
+            clusterTagParams.put(k, params);
+        });
         // reload inlong stream id
         Map<String, InlongStreamId> streamIdMap = new HashMap<>();
         clusterSetMapper.selectInlongStreamId()
@@ -376,7 +389,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
 
         // build Map<clusterTag, List<InlongIdObject>>
         Map<String, List<InLongIdObject>> inlongIdMap = 
this.parseInlongId(groupIdMap, groupParams, streamIdMap,
-                streamParams);
+                streamParams, clusterTagParams);
         // mark inlong id to proxy cluster
         for (Entry<String, DataProxyCluster> entry : 
proxyClusterMap.entrySet()) {
             String clusterTag = 
entry.getValue().getProxyCluster().getSetName();
@@ -392,7 +405,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
      */
     private Map<String, List<InLongIdObject>> parseInlongId(Map<String, 
InlongGroupId> groupIdMap,
             Map<String, Map<String, String>> groupParams, Map<String, 
InlongStreamId> streamIdMap,
-            Map<String, Map<String, String>> streamParams) {
+            Map<String, Map<String, String>> streamParams, Map<String, 
Map<String, String>> clusterTagParams) {
         Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
         for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
             InlongStreamId streamIdObj = entry.getValue();
@@ -408,12 +421,18 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
             obj.setInlongId(inlongId);
             Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> 
obj.getParams().putAll(v));
             Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> 
obj.getParams().putAll(v));
+
             if (StringUtils.isBlank(streamIdObj.getTopic())) {
                 obj.setTopic(groupIdObj.getTopic());
             } else {
                 obj.setTopic(streamIdObj.getTopic());
                 obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
             }
+            Map<String, String> clusterParamMap = 
clusterTagParams.get(groupIdObj.getClusterTag());
+            if (clusterParamMap != null && 
StringUtils.isNotBlank(clusterParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+                obj.getParams().put(KEY_INLONG_COMPRESS_TYPE, 
clusterParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+            }
+
             inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new 
ArrayList<>()).add(obj);
             // backup
             InLongIdObject backupObj = new InLongIdObject();
@@ -432,6 +451,12 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
                 } else {
                     backupObj.setTopic(groupMqResource);
                 }
+                Map<String, String> backUpTagParamMap = 
clusterTagParams.get(groupIdObj.getClusterTag());
+                if (backUpTagParamMap != null
+                        && 
StringUtils.isNotBlank(backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE))) {
+                    backupObj.getParams().put(KEY_INLONG_COMPRESS_TYPE,
+                            backUpTagParamMap.get(KEY_INLONG_COMPRESS_TYPE));
+                }
                 inlongIdMap.computeIfAbsent(clusterTag, k -> new 
ArrayList<>()).add(backupObj);
             }
         }

Reply via email to