This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fdadcbefd [INLONG-10279][Manager] Support compress unified
configuration (#10280)
3fdadcbefd is described below
commit 3fdadcbefdf629d241257004e7910551cf448c2b
Author: vernedeng <[email protected]>
AuthorDate: Sat Jun 15 11:17:34 2024 +0800
[INLONG-10279][Manager] Support compress unified configuration (#10280)
---
.../org/apache/inlong/common/pojo/sort/SortConfigResponse.java | 2 +-
.../apache/inlong/manager/service/core/impl/SortServiceImpl.java | 7 ++++---
.../config/loader/v2/ManagerSortClusterConfigLoader.java | 5 ++++-
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
index 327898ede0..f30b508f52 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
@@ -36,6 +36,6 @@ public class SortConfigResponse {
String msg;
int code;
String md5;
- String data;
+ byte[] data;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index d820e37129..597c934252 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.util.Utils;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.plugin.Plugin;
@@ -102,7 +103,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
/**
* key 1: sort cluster name, value : sort config
*/
- private Map<String, String> sortConfigMap = new ConcurrentHashMap<>();
+ private Map<String, byte[]> sortConfigMap = new ConcurrentHashMap<>();
/**
* key 1: sort cluster name, value : md5
*/
@@ -265,7 +266,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
private void reloadDataFlowConfig() {
ObjectMapper objectMapper = new ObjectMapper();
- Map<String, String> sortConfigs = new HashMap<>();
+ Map<String, byte[]> sortConfigs = new HashMap<>();
Map<String, String> sortConfigMd5s = new HashMap<>();
Map<String, List<SortTaskConfig>> temp = new HashMap<>();
List<SortConfigEntity> sinkConfigEntityList =
configLoader.loadAllSortConfigEntity();
@@ -308,7 +309,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
sortConfig.setTasks(temp.get(sortClusterName));
try {
String configStr = objectMapper.writeValueAsString(sortConfig);
- sortConfigs.put(sortClusterName, configStr);
+ sortConfigs.put(sortClusterName,
Utils.compressGZip(configStr.getBytes()));
String md5 = DigestUtils.md5Hex(configStr);
sortConfigMd5s.put(sortClusterName, md5);
} catch (Exception e) {
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
index 7882e4e310..44b55ee4c8 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.config.loader.v2;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
+import org.apache.inlong.common.util.Utils;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
@@ -88,7 +89,9 @@ public class ManagerSortClusterConfigLoader implements
SortConfigLoader {
}
this.md5 = clusterResponse.getMd5();
- return objectMapper.readValue(clusterResponse.getData(),
SortConfig.class);
+ byte[] decompress = Utils
+ .gzipDecompress(clusterResponse.getData(), 0,
clusterResponse.getData().length);
+ return objectMapper.readValue(decompress, SortConfig.class);
} catch (Exception ex) {
log.error("exception caught", ex);
return null;