This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e83699433d [INLONG-8771][Manager] Fix audit data error and add source
type for DataSync (#8772)
e83699433d is described below
commit e83699433d00dfdc746a9e10018ff2deb7a5744f
Author: haifxu <[email protected]>
AuthorDate: Mon Aug 21 12:35:14 2023 +0800
[INLONG-8771][Manager] Fix audit data error and add source type for
DataSync (#8772)
---
.../manager/service/core/impl/AuditServiceImpl.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 8a526caa4a..36b7cb0043 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -28,11 +28,13 @@ import
org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
@@ -46,6 +48,7 @@ import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
+import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
@@ -135,6 +138,8 @@ public class AuditServiceImpl implements AuditService {
private ClickHouseConfig config;
@Autowired
private AuditSourceEntityMapper auditSourceMapper;
+ @Autowired
+ private InlongGroupEntityMapper inlongGroupMapper;
@PostConstruct
public void initialize() {
@@ -248,6 +253,8 @@ public class AuditServiceImpl implements AuditService {
sinkNodeType = sinkEntity.getSinkType();
}
+ Set<String> sinkAuditIds = Sets.newHashSet(getAuditId(sinkNodeType,
true), getAuditId(sinkNodeType, false));
+
// properly overwrite audit ids by role and stream config
request.setAuditIds(getAuditIds(groupId, streamId, sinkNodeType));
@@ -265,11 +272,10 @@ public class AuditServiceImpl implements AuditService {
AuditInfo vo = new AuditInfo();
vo.setLogTs((String) s.get("logTs"));
vo.setCount(((BigDecimal) s.get("total")).longValue());
- vo.setCount(((BigDecimal)
s.get("totalDelay")).longValue());
+ vo.setDelay(((BigDecimal)
s.get("totalDelay")).longValue());
return vo;
}).collect(Collectors.toList());
- result.add(new AuditVO(auditId, auditSet,
- auditId.equals(getAuditId(sinkNodeType, true)) ?
sinkNodeType : null));
+ result.add(new AuditVO(auditId, auditSet,
sinkAuditIds.contains(auditId) ? sinkNodeType : null));
} else if (AuditQuerySource.ELASTICSEARCH == querySource) {
String index = String.format("%s_%s",
request.getStartDate().replaceAll("-", ""), auditId);
if (!elasticsearchApi.indexExists(index)) {
@@ -325,6 +331,10 @@ public class AuditServiceImpl implements AuditService {
auditSet.add(getAuditId(ClusterType.DATAPROXY, true));
} else {
auditSet.add(getAuditId(sinkNodeType, false));
+ InlongGroupEntity inlongGroup =
inlongGroupMapper.selectByGroupId(groupId);
+ if
(InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
+ auditSet.add(getAuditId(sinkNodeType, true));
+ }
}
// auto push source has no agent, return data-proxy audit data instead
of agent