This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new de894ed9d [INLONG-6827][Manager] Optimize the config managerment of
SortStandalone (#6828)
de894ed9d is described below
commit de894ed9d56720052b66721bbe421e1bc0b263ae
Author: vernedeng <[email protected]>
AuthorDate: Wed Dec 14 12:39:05 2022 +0800
[INLONG-6827][Manager] Optimize the config managerment of SortStandalone
(#6828)
---
.../service/core/impl/SortClusterServiceImpl.java | 69 ++++++++++++++--------
1 file changed, 43 insertions(+), 26 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index e55aa3ea8..f3541316f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -107,9 +108,9 @@ public class SortClusterServiceImpl implements
SortClusterService {
public void reload() {
LOGGER.debug("start to reload sort config");
try {
- reloadAllClusterConfigV2();
+ reloadAllClusterConfig();
} catch (Throwable t) {
- LOGGER.error(t.getMessage(), t);
+ LOGGER.error("fail to reload cluster config", t);
}
LOGGER.debug("end to reload config");
}
@@ -161,7 +162,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
.build();
}
- private void reloadAllClusterConfigV2() {
+ private void reloadAllClusterConfig() {
// load all fields info
List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
fieldMap = new HashMap<>();
@@ -174,13 +175,16 @@ public class SortClusterServiceImpl implements
SortClusterService {
// get all task under a given cluster, has been reduced into cluster
and task.
List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
- .filter(dto -> dto.getSortClusterName() != null)
+ .filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName())
+ && StringUtils.isNotBlank(dto.getSortTaskName())
+ && StringUtils.isNotBlank(dto.getDataNodeName())
+ && StringUtils.isNotBlank(dto.getSinkType()))
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
// get all stream sinks
Map<String, List<StreamSinkEntity>> task2AllStreams =
sinkEntities.stream()
.filter(entity ->
StringUtils.isNotBlank(entity.getInlongClusterName()))
- .collect(Collectors.groupingBy(StreamSinkEntity::getSinkName));
+
.collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName));
// get all data nodes and group by node name
List<DataNodeEntity> dataNodeEntities =
sortConfigLoader.loadAllDataNodeEntity();
@@ -199,7 +203,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
clusterTaskMap.forEach((clusterName, taskList) -> {
try {
- SortClusterConfig config =
this.getConfigByClusterNameV2(clusterName,
+ SortClusterConfig config =
this.getConfigByClusterName(clusterName,
taskList, task2AllStreams, task2DataNodeMap);
String jsonStr = GSON.toJson(config);
String md5 = DigestUtils.md5Hex(jsonStr);
@@ -207,8 +211,9 @@ public class SortClusterServiceImpl implements
SortClusterService {
newMd5Map.put(clusterName, md5);
} catch (Throwable e) {
// if get config failed, update the err log.
- newErrorLogMap.put(clusterName, e.getMessage());
- LOGGER.error("Failed to update cluster config={}, error={}",
clusterName, e.getMessage());
+ String errMsg =
Optional.ofNullable(e.getMessage()).orElse("Unknown error, please check logs");
+ newErrorLogMap.put(clusterName, errMsg);
+ LOGGER.error("Failed to update cluster config={}",
clusterName, e);
}
});
@@ -217,7 +222,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
sortClusterMd5Map = newMd5Map;
}
- private SortClusterConfig getConfigByClusterNameV2(
+ private SortClusterConfig getConfigByClusterName(
String clusterName,
List<SortTaskInfo> tasks,
Map<String, List<StreamSinkEntity>> task2AllStreams,
@@ -225,19 +230,24 @@ public class SortClusterServiceImpl implements
SortClusterService {
List<SortTaskConfig> taskConfigs = tasks.stream()
.map(task -> {
- String taskName = task.getSortTaskName();
- String type = task.getSinkType();
- String dataNodeName = task.getDataNodeName();
- DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName);
- List<StreamSinkEntity> streams =
task2AllStreams.get(taskName);
-
- return SortTaskConfig.builder()
- .name(taskName)
- .type(type)
- .idParams(this.parseIdParamsV2(streams))
- .sinkParams(this.parseSinkParamsV2(nodeInfo))
- .build();
+ try {
+ String taskName = task.getSortTaskName();
+ String type = task.getSinkType();
+ String dataNodeName = task.getDataNodeName();
+ DataNodeInfo nodeInfo =
task2DataNodeMap.get(dataNodeName);
+ List<StreamSinkEntity> streams =
task2AllStreams.get(taskName);
+ return SortTaskConfig.builder()
+ .name(taskName)
+ .type(type)
+ .idParams(this.parseIdParams(streams))
+ .sinkParams(this.parseSinkParams(nodeInfo))
+ .build();
+ } catch (Exception e) {
+ LOGGER.error("fail to parse sort task config of
cluster={}", clusterName, e);
+ return null;
+ }
})
+ .filter(Objects::nonNull)
.collect(Collectors.toList());
return SortClusterConfig.builder()
@@ -246,18 +256,25 @@ public class SortClusterServiceImpl implements
SortClusterService {
.build();
}
- private List<Map<String, String>> parseIdParamsV2(List<StreamSinkEntity>
streams) {
+ private List<Map<String, String>> parseIdParams(List<StreamSinkEntity>
streams) {
return streams.stream()
.map(streamSink -> {
- StreamSinkOperator operator =
sinkOperatorFactory.getInstance(streamSink.getSinkType());
- List<String> fields =
fieldMap.get(streamSink.getInlongGroupId());
- return operator.parse2IdParams(streamSink, fields);
+ try {
+ StreamSinkOperator operator =
sinkOperatorFactory.getInstance(streamSink.getSinkType());
+ List<String> fields =
fieldMap.get(streamSink.getInlongGroupId());
+ return operator.parse2IdParams(streamSink, fields);
+ } catch (Exception e) {
+ LOGGER.error("fail to parse id params of groupId={},
streamId={} name={}, type={}}",
+ streamSink.getInlongGroupId(),
streamSink.getInlongStreamId(),
+ streamSink.getSinkName(),
streamSink.getSinkType(), e);
+ return null;
+ }
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
- private Map<String, String> parseSinkParamsV2(DataNodeInfo nodeInfo) {
+ private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
DataNodeOperator operator =
dataNodeOperatorFactory.getInstance(nodeInfo.getType());
return operator.parse2SinkParams(nodeInfo);
}