This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a133995d1 [INLONG-5694][Manager] Fix the problem that gets the inlong 
group error (#5695)
a133995d1 is described below

commit a133995d1a9412cb31822ef737782afaf674d0ea
Author: fuweng11 <[email protected]>
AuthorDate: Fri Aug 26 12:51:20 2022 +0800

    [INLONG-5694][Manager] Fix the problem that gets the inlong group error 
(#5695)
---
 .../manager/plugin/listener/DeleteSortListener.java     | 12 ++++++------
 .../manager/plugin/listener/DeleteStreamListener.java   | 12 ++++++------
 .../manager/plugin/listener/RestartSortListener.java    | 14 +++++++-------
 .../manager/plugin/listener/RestartStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/StartupStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/SuspendSortListener.java    | 12 ++++++------
 .../manager/plugin/listener/SuspendStreamListener.java  | 12 ++++++------
 .../service/core/impl/ConsumptionServiceImpl.java       |  8 ++++----
 .../service/core/impl/SortSourceServiceImpl.java        | 17 +++++++++--------
 .../manager/service/group/InlongGroupServiceImpl.java   |  5 +++--
 .../workflow/core/impl/WorkflowQueryServiceImpl.java    |  5 +++--
 11 files changed, 64 insertions(+), 61 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index f2dd23537..227674f6e 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -88,8 +88,8 @@ public class DeleteSortListener implements 
SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("delete sort failed for groupId 
[%s], as the sort properties is empty",
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index d54c6aa56..96c856781 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class DeleteStreamListener implements 
SortOperateListener {
         log.info("inlong stream :{} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index fc4ef876c..2b2218b52 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -23,22 +23,22 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class RestartSortListener implements 
SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("restart sort failed for groupId 
[%s], as the sort properties is empty",
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index cdf570e5e..ab94de669 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -23,24 +23,24 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -90,8 +90,8 @@ public class RestartStreamListener implements 
SortOperateListener {
         log.info("inlong stream :{} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 9c37449f2..1ee9deffb 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -24,24 +24,24 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -97,8 +97,8 @@ public class StartupStreamListener implements 
SortOperateListener {
             return ListenerResult.success();
         }
 
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 88428c118..59eb35e32 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -88,8 +88,8 @@ public class SuspendSortListener implements 
SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("suspend sort failed for groupId 
[%s], as the sort properties is empty",
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index c3b2e21ad..f1e13ddf6 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class SuspendStreamListener implements 
SortOperateListener {
         log.info("inlong stream :{} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index e7a1c9e12..8a899a390 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -36,7 +36,6 @@ import 
org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.pojo.common.CountInfo;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
@@ -62,6 +61,7 @@ import org.springframework.util.CollectionUtils;
 
 import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -98,9 +98,9 @@ public class ConsumptionServiceImpl implements 
ConsumptionService {
 
     @Override
     public ConsumptionSummary getSummary(ConsumptionQuery query) {
-        Map<String, Integer> countMap = consumptionMapper.countByQuery(query)
-                .stream()
-                .collect(Collectors.toMap(CountInfo::getKey, 
CountInfo::getValue));
+        Map<String, Integer> countMap = new HashMap<>();
+        consumptionMapper.countByQuery(query)
+                .forEach(countInfo -> countMap.put(countInfo.getKey(), 
countInfo.getValue()));
 
         return ConsumptionSummary.builder()
                 .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index dc51b6e9f..0510ce2f1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -42,6 +42,7 @@ import 
org.springframework.transaction.annotation.Transactional;
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -235,11 +236,11 @@ public class SortSourceServiceImpl implements 
SortSourceService {
 
             task2Group.forEach((task, groupList) -> {
                 // get topic properties under this cluster and task, group 
them by group id.
-                Map<String, Map<String, String>> group2topicProp = 
allStreamInfos.stream()
-                        .filter(stream -> stream.getSortTaskName().equals(task)
-                                && 
stream.getSortClusterName().equals(clusterName))
-                        
.collect(Collectors.toMap(SortSourceStreamInfo::getGroupId,
-                                SortSourceStreamInfo::getExtParamsMap));
+                Map<String, Map<String, String>> group2topicProp = new 
HashMap<>();
+                allStreamInfos.stream().filter(stream -> 
stream.getSortTaskName().equals(task)
+                        && 
stream.getSortClusterName().equals(clusterName)).forEach(
+                        sortSourceStreamInfo -> 
group2topicProp.put(sortSourceStreamInfo.getGroupId(),
+                                sortSourceStreamInfo.getExtParamsMap()));
 
                 Map<String, CacheZone> cacheZones;
                 try {
@@ -317,9 +318,9 @@ public class SortSourceServiceImpl implements 
SortSourceService {
         List<String> tags = new ArrayList<>(tag2GroupInfos.keySet());
 
         // Clusters that related to these tags
-        Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = 
allTag2ClusterInfos.entrySet().stream()
-                .filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
-                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+        Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = new 
HashMap<>();
+        allTag2ClusterInfos.entrySet().stream().filter(entry -> 
tag2GroupInfos.containsKey(entry.getKey()))
+                .forEach(entry -> tag2ClusterInfos.put(entry.getKey(), 
entry.getValue()));
 
         // get CacheZone list
         return tags.stream()
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 65aecd2e3..d9c5f1c63 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -70,6 +70,7 @@ import org.springframework.validation.annotation.Validated;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -422,8 +423,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     }
 
     private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
-        Map<String, String> extMap = extInfos.stream()
-                .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        Map<String, String> extMap = new HashMap<>();
+        extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), 
extInfo.getKeyValue()));
         String type = extMap.get(InlongConstants.SORT_TYPE);
         if (StringUtils.isBlank(type)) {
             return null;
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
index b34bc8e83..5eac67e94 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
@@ -62,6 +62,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -118,8 +119,8 @@ public class WorkflowQueryServiceImpl implements 
WorkflowQueryService {
     public ProcessCountResponse countProcess(ProcessCountRequest request) {
         List<CountInfo> result = processEntityMapper.countByQuery(request);
 
-        Map<String, Integer> countByState = result.stream()
-                .collect(Collectors.toMap(CountInfo::getKey, 
CountInfo::getValue));
+        Map<String, Integer> countByState = new HashMap<>();
+        result.forEach(countInfo -> countByState.put(countInfo.getKey(), 
countInfo.getValue()));
 
         return ProcessCountResponse.builder()
                 .totalApplyCount(countByState.values().stream().mapToInt(c -> 
c).sum())

Reply via email to