This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a68f4bcd34 [INLONG-9886][Manager] Optimized code
DataProxyConfigRepositoryV2 (#9888)
a68f4bcd34 is described below
commit a68f4bcd34d8bba8cfb1c113debe69311aa242e8
Author: balloon72 <[email protected]>
AuthorDate: Thu Mar 28 20:45:49 2024 +0800
[INLONG-9886][Manager] Optimized code DataProxyConfigRepositoryV2 (#9888)
Co-authored-by: hanmo1 <ISFA-9844>
---
.../repository/DataProxyConfigRepositoryV2.java | 27 ++++++----------------
1 file changed, 7 insertions(+), 20 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
index e169efc9b7..f9b5b6aa67 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -258,10 +258,8 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
if (StringUtils.equalsIgnoreCase(producerTag,
Boolean.TRUE.toString()) && StringUtils.isNotBlank(
cacheCluster.getClusterTags())) {
Set<String> clusterTags =
Sets.newHashSet(cacheCluster.getClusterTags().split(InlongConstants.COMMA));
- clusterTags.forEach(clusterTag -> {
- cacheClusterMap.computeIfAbsent(clusterTag, k -> new
HashMap<>())
- .computeIfAbsent(cacheCluster.getExtTag(), k ->
new ArrayList<>()).add(cacheCluster);
- });
+ clusterTags.forEach(clusterTag ->
cacheClusterMap.computeIfAbsent(clusterTag, k -> new HashMap<>())
+ .computeIfAbsent(cacheCluster.getExtTag(), k -> new
ArrayList<>()).add(cacheCluster));
}
}
// mark cache cluster to proxy cluster
@@ -530,9 +528,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
ClusterPageRequest clusterRequest = new ClusterPageRequest();
List<InlongClusterEntity> clusters =
clusterMapper.selectByCondition(clusterRequest);
- clusters.forEach((v) -> {
- clusterMap.put(v.getName(), v);
- });
+ clusters.forEach(v -> clusterMap.put(v.getName(), v));
// prepare stream sink
SinkPageRequest request = new SinkPageRequest();
request.setInlongGroupId(inlongGroupId);
@@ -541,10 +537,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
for (StreamSinkEntity streamSink : streamSinks) {
String clusterName = streamSink.getInlongClusterName();
InlongClusterEntity cluster = clusterMap.get(clusterName);
- if (cluster == null) {
- continue;
- }
- if (!StringUtils.equals(oldClusterTag,
cluster.getClusterTags())) {
+ if (cluster == null || !StringUtils.equals(oldClusterTag,
cluster.getClusterTags())) {
continue;
}
String clusterType = cluster.getType();
@@ -556,9 +549,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
}
}
// update
- newStreamSinks.forEach((v) -> {
- streamSinkMapper.insert(v);
- });
+ newStreamSinks.forEach(v -> streamSinkMapper.insert(v));
int rowCount =
inlongGroupMapper.updateByIdentifierSelective(newGroup);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("inlong group has already updated with group
id={}, curVersion={}",
@@ -675,9 +666,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
ClusterPageRequest clusterRequest = new ClusterPageRequest();
List<InlongClusterEntity> clusters =
clusterMapper.selectByCondition(clusterRequest);
- clusters.forEach((v) -> {
- clusterMap.put(v.getName(), v);
- });
+ clusters.forEach(v -> clusterMap.put(v.getName(), v));
// prepare stream sink
SinkPageRequest request = new SinkPageRequest();
request.setInlongGroupId(inlongGroupId);
@@ -694,9 +683,7 @@ public class DataProxyConfigRepositoryV2 implements
IRepository {
}
}
// delete old stream sink
- deleteStreamSinks.forEach((v) -> {
- streamSinkMapper.deleteById(v.getId());
- });
+ deleteStreamSinks.forEach(v -> streamSinkMapper.deleteById(v.getId()));
return inlongGroupId;
}
}