fuweng11 commented on code in PR #12099:
URL: https://github.com/apache/inlong/pull/12099#discussion_r2870349334


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##########
@@ -168,83 +173,138 @@ public List<AuditVO> listByCondition(AuditRequest 
request) throws Exception {
 
         // for now, we use the first sink type only.
         // this is temporary behavior before multiple sinks in one stream is 
fully supported.
-        String sinkNodeType = null;
-        String sourceNodeType = null;
-        Integer sinkId = request.getSinkId();
-        StreamSinkEntity sinkEntity = null;
-        if (StringUtils.isNotBlank(streamId)) {
-            List<StreamSinkEntity> sinkEntityList = 
sinkEntityMapper.selectByRelatedId(groupId, streamId);
-            if (sinkId != null) {
-                sinkEntity = sinkEntityMapper.selectByPrimaryKey(sinkId);
-            } else if (CollectionUtils.isNotEmpty(sinkEntityList)) {
-                sinkEntity = sinkEntityList.get(0);
-            }
-            // if sink info is existed, get sink type for query audit info.
-            if (sinkEntity != null) {
-                sinkNodeType = sinkEntity.getSinkType();
-            }
-        } else {
-            sinkNodeType = request.getSinkType();
+        String sinkNodeType = request.getSinkType();
+        // if sinkNodeType is not blank, should use directly
+        if (StringUtils.isBlank(sinkNodeType) && 
Boolean.TRUE.equals(request.getNeedSinkAudit())) {
+            sinkNodeType = getSinkNodeType(request.getSinkId(), groupId, 
streamId);
         }
 
-        Map<String, String> auditIdMap = new HashMap<>();
+        // key: auditId, value: nodeType
+        Map<String, String> auditId2NodeTypeMap = new HashMap<>(8);
+        fillAuditId2SinkNodeTypeMap(auditId2NodeTypeMap, sinkNodeType);
+
+        // set sourceNodeType is sinkNodeType firstly
+        String sourceNodeType = request.getSourceType();
+        if (StringUtils.isBlank(sourceNodeType)) {
+            sourceNodeType = sinkNodeType;
+        }
 
         if (StringUtils.isNotBlank(groupId)) {
-            InlongGroupEntity groupEntity = 
inlongGroupMapper.selectByGroupId(groupId);
-            List<StreamSourceEntity> sourceEntityList = 
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
-            if (CollectionUtils.isNotEmpty(sourceEntityList)) {
-                sourceNodeType = sourceEntityList.get(0).getSourceType();
+            List<StreamSourceEntity> sourceList = null;
+            if (StringUtils.isNotBlank(streamId)) {
+                // if sourceNodeType is blank, get sourceNodeType by groupId 
and streamId from StreamSourceEntity
+                if (StringUtils.isBlank(sourceNodeType) && 
Boolean.TRUE.equals(request.getNeedSourceAudit())) {
+                    sourceList = sourceEntityMapper.selectByRelatedId(groupId, 
streamId, null);
+                    if (CollectionUtils.isNotEmpty(sourceList)) {
+                        sourceNodeType = sourceList.get(0).getSourceType();
+                        fillAuditId2SourceNodeTypeMap(auditId2NodeTypeMap, 
sourceNodeType);
+                    }
+                }
             }
 
-            auditIdMap.put(getAuditId(sinkNodeType, 
IndicatorType.SEND_SUCCESS), sinkNodeType);
-
             if (CollectionUtils.isEmpty(request.getAuditIds())) {
-                // properly overwrite audit ids by role and stream config
-                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
-                        || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) 
{
-                    List<AuditInformation> cdcAuditInfoList =
-                            getCdcAuditInfoList(sourceNodeType, 
IndicatorType.RECEIVED_SUCCESS);
-                    List<String> cdcAuditIdList =
-                            cdcAuditInfoList.stream().map(v -> 
String.valueOf(v.getAuditId()))
-                                    .collect(Collectors.toList());
-                    if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
-                        String tempSourceNodeType = sourceNodeType;
-                        cdcAuditIdList.forEach(v -> auditIdMap.put(v, 
tempSourceNodeType));
-                    }
-                    auditIdMap.put(getAuditId(sourceNodeType, 
IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
-                    request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
-                } else {
-                    auditIdMap.put(getAuditId(sinkNodeType, 
IndicatorType.RECEIVED_SUCCESS), sinkNodeType);
-                    request.setAuditIds(getAuditIds(groupId, streamId, null, 
sinkNodeType));
-                }
+                InlongGroupEntity groupEntity = 
inlongGroupMapper.selectByGroupId(groupId);

Review Comment:
   Only when groupGroupMode is DATASYNC_REALTIME_MODE, SourceAudit is required; 
it is not necessary to use SourceAudit in all cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to