This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e83699433d [INLONG-8771][Manager] Fix audit data error and add source 
type for DataSync (#8772)
e83699433d is described below

commit e83699433d00dfdc746a9e10018ff2deb7a5744f
Author: haifxu <[email protected]>
AuthorDate: Mon Aug 21 12:35:14 2023 +0800

    [INLONG-8771][Manager] Fix audit data error and add source type for 
DataSync (#8772)
---
 .../manager/service/core/impl/AuditServiceImpl.java      | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 8a526caa4a..36b7cb0043 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -28,11 +28,13 @@ import 
org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
 import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
 import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
 import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.audit.AuditInfo;
@@ -46,6 +48,7 @@ import org.apache.inlong.manager.service.core.AuditService;
 import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
 import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ibatis.jdbc.SQL;
@@ -135,6 +138,8 @@ public class AuditServiceImpl implements AuditService {
     private ClickHouseConfig config;
     @Autowired
     private AuditSourceEntityMapper auditSourceMapper;
+    @Autowired
+    private InlongGroupEntityMapper inlongGroupMapper;
 
     @PostConstruct
     public void initialize() {
@@ -248,6 +253,8 @@ public class AuditServiceImpl implements AuditService {
             sinkNodeType = sinkEntity.getSinkType();
         }
 
+        Set<String> sinkAuditIds = Sets.newHashSet(getAuditId(sinkNodeType, 
true), getAuditId(sinkNodeType, false));
+
         // properly overwrite audit ids by role and stream config
         request.setAuditIds(getAuditIds(groupId, streamId, sinkNodeType));
 
@@ -265,11 +272,10 @@ public class AuditServiceImpl implements AuditService {
                     AuditInfo vo = new AuditInfo();
                     vo.setLogTs((String) s.get("logTs"));
                     vo.setCount(((BigDecimal) s.get("total")).longValue());
-                    vo.setCount(((BigDecimal) 
s.get("totalDelay")).longValue());
+                    vo.setDelay(((BigDecimal) 
s.get("totalDelay")).longValue());
                     return vo;
                 }).collect(Collectors.toList());
-                result.add(new AuditVO(auditId, auditSet,
-                        auditId.equals(getAuditId(sinkNodeType, true)) ? 
sinkNodeType : null));
+                result.add(new AuditVO(auditId, auditSet, 
sinkAuditIds.contains(auditId) ? sinkNodeType : null));
             } else if (AuditQuerySource.ELASTICSEARCH == querySource) {
                 String index = String.format("%s_%s", 
request.getStartDate().replaceAll("-", ""), auditId);
                 if (!elasticsearchApi.indexExists(index)) {
@@ -325,6 +331,10 @@ public class AuditServiceImpl implements AuditService {
             auditSet.add(getAuditId(ClusterType.DATAPROXY, true));
         } else {
             auditSet.add(getAuditId(sinkNodeType, false));
+            InlongGroupEntity inlongGroup = 
inlongGroupMapper.selectByGroupId(groupId);
+            if 
(InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
+                auditSet.add(getAuditId(sinkNodeType, true));
+            }
         }
 
         // auto push source has no agent, return data-proxy audit data instead 
of agent

Reply via email to