This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 560c1ec824 [INLONG-12098][Manager] Improve audit query nodeType
mapping completeness and avoid null audit IDs(#12099)
560c1ec824 is described below
commit 560c1ec824a592d054c4b24c4a03e4baa1810e45
Author: healchow <[email protected]>
AuthorDate: Wed Mar 4 15:33:23 2026 +0800
[INLONG-12098][Manager] Improve audit query nodeType mapping completeness
and avoid null audit IDs(#12099)
---
.../inlong/manager/pojo/audit/AuditRequest.java | 9 +
.../manager/service/audit/AuditRunnable.java | 28 +--
.../service/core/impl/AuditServiceImpl.java | 237 ++++++++++++++-------
.../src/main/resources/application.properties | 1 +
4 files changed, 181 insertions(+), 94 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
index d55b1b392f..535efc7687 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
@@ -46,12 +46,21 @@ public class AuditRequest {
@ApiModelProperty(value = "audit id list")
private List<String> auditIds;
+ @ApiModelProperty(value = "Source type")
+ private String sourceType;
+
+ @ApiModelProperty(value = "Need source audit or not, default is true,
means need source audit")
+ private Boolean needSourceAudit = true;
+
@ApiModelProperty(value = "sink id")
private Integer sinkId;
@ApiModelProperty(value = "sink type")
private String sinkType;
+ @ApiModelProperty(value = "Need sink audit or not, default is true, means
need sink audit")
+ private Boolean needSinkAudit = true;
+
@ApiModelProperty(value = "query start date, format by 'yyyy-MM-dd'",
required = true, example = "2022-01-01")
private String startDate;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
index 72c4325f04..20c4b31c5e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
@@ -51,21 +52,21 @@ public class AuditRunnable implements Runnable {
private static final DateTimeFormatter SECOND_DATE_FORMATTER =
DateTimeFormat.forPattern(SECOND_FORMAT);
private static final DateTimeFormatter DAY_DATE_FORMATTER =
DateTimeFormat.forPattern(DAY_FORMAT);
- private AuditRequest request;
- private String auditId;
- private String auditName;
- private List<AuditVO> auditVOList;
- private CountDownLatch latch;
- private RestTemplate restTemplate;
- private String auditUrl;
- private Map<String, String> auditIdMap;
- private Boolean listAll;
+ private final AuditRequest request;
+ private final String auditId;
+ private final String auditName;
+ private final ConcurrentLinkedQueue<AuditVO> auditResultQueue;
+ private final CountDownLatch latch;
+ private final RestTemplate restTemplate;
+ private final String auditUrl;
+ private final Map<String, String> auditId2NodeTypeMap;
+ private final Boolean listAll;
public AuditRunnable(
AuditRequest request,
String auditId,
String auditName,
- List<AuditVO> auditVOList,
+ ConcurrentLinkedQueue<AuditVO> auditResultQueue,
CountDownLatch latch,
RestTemplate restTemplate,
String auditUrl,
@@ -74,11 +75,11 @@ public class AuditRunnable implements Runnable {
this.request = request;
this.auditId = auditId;
this.auditName = auditName;
- this.auditVOList = auditVOList;
+ this.auditResultQueue = auditResultQueue;
this.latch = latch;
this.restTemplate = restTemplate;
this.auditUrl = auditUrl;
- this.auditIdMap = auditIdMap == null ? new HashMap<>() : auditIdMap;
+ this.auditId2NodeTypeMap = auditIdMap == null ? new HashMap<>() :
auditIdMap;
this.listAll = listAll;
}
@@ -92,7 +93,8 @@ public class AuditRunnable implements Runnable {
} else {
auditSet = getAuditInfoList(request,
request.getInlongGroupId(), request.getInlongStreamId(), auditId);
}
- auditVOList.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
+ auditResultQueue
+ .add(new AuditVO(auditId, auditName, auditSet,
auditId2NodeTypeMap.getOrDefault(auditId, null)));
} catch (Exception e) {
LOGGER.error("query audit failed for request={}", request);
throw new BusinessException("query audit failed");
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 2ae9342515..00482b1fcf 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -43,7 +43,7 @@ import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.audit.AuditRunnable;
import org.apache.inlong.manager.service.core.AuditService;
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,12 +63,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.inlong.common.enums.IndicatorType.RECEIVED_SUCCESS;
+
/**
* Audit service layer implementation
*/
@@ -81,7 +84,7 @@ public class AuditServiceImpl implements AuditService {
// key 1: type of audit, like pulsar, hive, key 2: indicator type, value :
entity of audit base item
private final Map<String, Map<Integer, AuditInformation>>
auditIndicatorMap = new ConcurrentHashMap<>();
private final Map<String, String> auditItemMap = new ConcurrentHashMap<>();
- private ScheduledExecutorService executor =
Executors.newScheduledThreadPool(10);
+ private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(10);
// defaults to return all audit ids, can be overwritten in properties file
// see audit id definitions:
https://inlong.apache.org/docs/modules/audit/overview#audit-id
@Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
@@ -90,6 +93,8 @@ public class AuditServiceImpl implements AuditService {
private List<String> auditIdListForUser;
@Value("${audit.query.url:http://127.0.0.1:10080}")
private String auditQueryUrl;
+ @Value("${audit.query.queryTimeoutSeconds:30}")
+ private int queryTimeoutSeconds;
@Autowired
private StreamSinkEntityMapper sinkEntityMapper;
@@ -168,83 +173,138 @@ public class AuditServiceImpl implements AuditService {
// for now, we use the first sink type only.
// this is temporary behavior before multiple sinks in one stream is
fully supported.
- String sinkNodeType = null;
- String sourceNodeType = null;
- Integer sinkId = request.getSinkId();
- StreamSinkEntity sinkEntity = null;
- if (StringUtils.isNotBlank(streamId)) {
- List<StreamSinkEntity> sinkEntityList =
sinkEntityMapper.selectByRelatedId(groupId, streamId);
- if (sinkId != null) {
- sinkEntity = sinkEntityMapper.selectByPrimaryKey(sinkId);
- } else if (CollectionUtils.isNotEmpty(sinkEntityList)) {
- sinkEntity = sinkEntityList.get(0);
- }
- // if sink info is existed, get sink type for query audit info.
- if (sinkEntity != null) {
- sinkNodeType = sinkEntity.getSinkType();
- }
- } else {
- sinkNodeType = request.getSinkType();
+ String sinkNodeType = request.getSinkType();
+ // if sinkNodeType is not blank, should use directly
+ if (StringUtils.isBlank(sinkNodeType) &&
Boolean.TRUE.equals(request.getNeedSinkAudit())) {
+ sinkNodeType = getSinkNodeType(request.getSinkId(), groupId,
streamId);
}
- Map<String, String> auditIdMap = new HashMap<>();
+ // key: auditId, value: nodeType
+ Map<String, String> auditId2NodeTypeMap = new HashMap<>(8);
+ fillAuditId2SinkNodeTypeMap(auditId2NodeTypeMap, sinkNodeType);
+
+ // set sourceNodeType is sinkNodeType firstly
+ String sourceNodeType = request.getSourceType();
+ if (StringUtils.isBlank(sourceNodeType)) {
+ sourceNodeType = sinkNodeType;
+ }
if (StringUtils.isNotBlank(groupId)) {
- InlongGroupEntity groupEntity =
inlongGroupMapper.selectByGroupId(groupId);
- List<StreamSourceEntity> sourceEntityList =
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
- if (CollectionUtils.isNotEmpty(sourceEntityList)) {
- sourceNodeType = sourceEntityList.get(0).getSourceType();
+ List<StreamSourceEntity> sourceList = null;
+ if (StringUtils.isNotBlank(streamId)) {
+ // if sourceNodeType is blank, get sourceNodeType by groupId
and streamId from StreamSourceEntity
+ if (StringUtils.isBlank(sourceNodeType) &&
Boolean.TRUE.equals(request.getNeedSourceAudit())) {
+ sourceList = sourceEntityMapper.selectByRelatedId(groupId,
streamId, null);
+ if (CollectionUtils.isNotEmpty(sourceList)) {
+ sourceNodeType = sourceList.get(0).getSourceType();
+ fillAuditId2SourceNodeTypeMap(auditId2NodeTypeMap,
sourceNodeType);
+ }
+ }
}
- auditIdMap.put(getAuditId(sinkNodeType,
IndicatorType.SEND_SUCCESS), sinkNodeType);
-
if (CollectionUtils.isEmpty(request.getAuditIds())) {
- // properly overwrite audit ids by role and stream config
- if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
- ||
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode()))
{
- List<AuditInformation> cdcAuditInfoList =
- getCdcAuditInfoList(sourceNodeType,
IndicatorType.RECEIVED_SUCCESS);
- List<String> cdcAuditIdList =
- cdcAuditInfoList.stream().map(v ->
String.valueOf(v.getAuditId()))
- .collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
- String tempSourceNodeType = sourceNodeType;
- cdcAuditIdList.forEach(v -> auditIdMap.put(v,
tempSourceNodeType));
- }
- auditIdMap.put(getAuditId(sourceNodeType,
IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId,
sourceNodeType, sinkNodeType));
- } else {
- auditIdMap.put(getAuditId(sinkNodeType,
IndicatorType.RECEIVED_SUCCESS), sinkNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId, null,
sinkNodeType));
- }
+ InlongGroupEntity groupEntity =
inlongGroupMapper.selectByGroupId(groupId);
+
request.setAuditIds(getAuditIds(groupEntity.getInlongGroupMode(),
sourceNodeType, sinkNodeType,
+ auditId2NodeTypeMap, sourceList));
}
} else if (CollectionUtils.isEmpty(request.getAuditIds())) {
- throw new BusinessException("audits id is empty");
+ // groupId and auditIds cannot both be empty
+ throw new BusinessException("InlongGroupId is blank, auditIds
cannot be empty");
}
- List<AuditVO> result = new ArrayList<>();
+ ConcurrentLinkedQueue<AuditVO> auditResultQueue = new
ConcurrentLinkedQueue<>();
CountDownLatch latch = new
CountDownLatch(request.getAuditIds().size());
for (String auditId : request.getAuditIds()) {
String auditName = auditItemMap.get(auditId);
- this.executor.execute(new AuditRunnable(request, auditId,
auditName, result, latch, restTemplate,
- auditQueryUrl, auditIdMap, false));
+ this.executor.execute(new AuditRunnable(request, auditId,
auditName, auditResultQueue, latch,
+ restTemplate, auditQueryUrl, auditId2NodeTypeMap, false));
+ }
+
+ boolean completed = latch.await(queryTimeoutSeconds, TimeUnit.SECONDS);
+ if (!completed) {
+ LOGGER.warn("Timeout to list audit for request={}", request);
+ } else {
+ LOGGER.info("Success to list audit for request={}", request);
+ }
+
+ return new ArrayList<>(auditResultQueue);
+ }
+
+ private String getSinkNodeType(Integer sinkId, String groupId, String
streamId) {
+ StreamSinkEntity sinkEntity = null;
+ if (sinkId != null && sinkId > 0) {
+ sinkEntity = sinkEntityMapper.selectByPrimaryKey(sinkId);
+ } else if (StringUtils.isNoneBlank(groupId, streamId)) {
+ List<StreamSinkEntity> sinkEntityList =
sinkEntityMapper.selectByRelatedId(groupId, streamId);
+ if (CollectionUtils.isNotEmpty(sinkEntityList)) {
+ sinkEntity = sinkEntityList.get(0);
+ }
+ }
+ // if sink info is existed, get sink type for query audit info.
+ String sinkNodeType = null;
+ if (sinkEntity != null) {
+ sinkNodeType = sinkEntity.getSinkType();
+ }
+
+ return sinkNodeType;
+ }
+
+ /**
+ * Fill the audit ID to node type mapping as fully as possible, avoiding
null nodeType values.
+ *
+ * @param sourceNodeType source node type
+ */
+ private void fillAuditId2SourceNodeTypeMap(Map<String, String> resultMap,
String sourceNodeType) {
+ if (StringUtils.isBlank(sourceNodeType)) {
+ return;
+ }
+
+ String sourceReceivedSuccessId = getAuditId(sourceNodeType,
RECEIVED_SUCCESS);
+ if (StringUtils.isNotBlank(sourceReceivedSuccessId)) {
+ resultMap.put(sourceReceivedSuccessId, sourceNodeType);
+ }
+ }
+
+ /**
+ * Fill the audit ID to node type mapping as fully as possible, avoiding
null nodeType values.
+ *
+ * @param sinkNodeType sink node type
+ */
+ private void fillAuditId2SinkNodeTypeMap(Map<String, String> resultMap,
String sinkNodeType) {
+ if (StringUtils.isBlank(sinkNodeType)) {
+ return;
+ }
+
+ // Prevent null sink audit ID to avoid potential NPE during iteration
+ String sinkSendSuccessId = getAuditId(sinkNodeType,
IndicatorType.SEND_SUCCESS);
+ if (StringUtils.isNotBlank(sinkSendSuccessId)) {
+ resultMap.put(sinkSendSuccessId, sinkNodeType);
+ }
+
+ String sinkReceivedSuccessId = getAuditId(sinkNodeType,
RECEIVED_SUCCESS);
+ if (StringUtils.isNotBlank(sinkReceivedSuccessId)) {
+ resultMap.put(sinkReceivedSuccessId, sinkNodeType);
}
- latch.await(30, TimeUnit.SECONDS);
- LOGGER.info("success to query audit list for request={}", request);
- return result;
}
@Override
public List<AuditVO> listAll(AuditRequest request) throws Exception {
- List<AuditVO> result = new ArrayList<>();
+ ConcurrentLinkedQueue<AuditVO> auditResultQueue = new
ConcurrentLinkedQueue<>();
CountDownLatch latch = new
CountDownLatch(request.getAuditIds().size());
for (String auditId : request.getAuditIds()) {
String auditName = auditItemMap.get(auditId);
- this.executor.execute(new AuditRunnable(request, auditId,
auditName, result, latch, restTemplate,
+ this.executor.execute(new AuditRunnable(request, auditId,
auditName, auditResultQueue, latch, restTemplate,
auditQueryUrl, null, true));
}
- latch.await(30, TimeUnit.SECONDS);
- return result;
+
+ boolean completed = latch.await(queryTimeoutSeconds, TimeUnit.SECONDS);
+ if (!completed) {
+ LOGGER.warn("Timeout to list all audit for request={}", request);
+ } else {
+ LOGGER.info("Success to list all audit for request={}", request);
+ }
+
+ return new ArrayList<>(auditResultQueue);
}
@Override
@@ -255,56 +315,71 @@ public class AuditServiceImpl implements AuditService {
return AuditOperator.getInstance().getAllAuditInformation();
}
- private List<String> getAuditIds(String groupId, String streamId, String
sourceNodeType, String sinkNodeType) {
- Set<String> auditSet =
LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)
+ private List<String> getAuditIds(Integer groupMode, String sourceNodeType,
String sinkNodeType,
+ Map<String, String> auditId2NodeTypeMap, List<StreamSourceEntity>
sourceEntityList) {
+ // properly overwrite audit ids by role and stream config
+ boolean isDataSyncMode =
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
+ || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode);
+ if (isDataSyncMode) {
+ List<AuditInformation> cdcAuditInfoList =
getCdcAuditInfoList(sourceNodeType, RECEIVED_SUCCESS);
+ List<String> cdcAuditIds = null;
+ if (CollectionUtils.isNotEmpty(cdcAuditInfoList)) {
+ cdcAuditIds = cdcAuditInfoList.stream()
+ .map(v -> String.valueOf(v.getAuditId()))
+ .collect(Collectors.toList());
+ cdcAuditIds.forEach(v -> auditId2NodeTypeMap.put(v,
sourceNodeType));
+ }
+
+ return getAuditIds(true, cdcAuditIds, sourceNodeType,
sinkNodeType, sourceEntityList);
+ } else {
+ return getAuditIds(false, null, null, sinkNodeType,
sourceEntityList);
+ }
+ }
+
+ private List<String> getAuditIds(boolean isDataSyncMode, List<String>
cdcAuditIds,
+ String sourceNodeType, String sinkNodeType,
List<StreamSourceEntity> sourceList) {
+ Set<String> auditIdSet =
LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)
? new HashSet<>(auditIdListForAdmin)
: new HashSet<>(auditIdListForUser);
// if no sink is configured, return data-proxy output instead of sort
if (sinkNodeType == null) {
- auditSet.add(getAuditId(ClusterType.DATAPROXY,
IndicatorType.SEND_SUCCESS));
+ auditIdSet.add(getAuditId(ClusterType.DATAPROXY,
IndicatorType.SEND_SUCCESS));
} else {
- auditSet.add(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS));
- InlongGroupEntity inlongGroup =
inlongGroupMapper.selectByGroupId(groupId);
- if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())
- ||
InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode()))
{
- List<AuditInformation> cdcAuditInfoList =
- getCdcAuditInfoList(sourceNodeType,
IndicatorType.RECEIVED_SUCCESS);
- if (CollectionUtils.isNotEmpty(cdcAuditInfoList)) {
- List<String> cdcAuditIdList =
- cdcAuditInfoList.stream().map(v ->
String.valueOf(v.getAuditId()))
- .collect(Collectors.toList());
- auditSet.addAll(cdcAuditIdList);
+ auditIdSet.add(getAuditId(sinkNodeType,
IndicatorType.SEND_SUCCESS));
+ if (isDataSyncMode) {
+ // if sourceNodeType is not blank, add cdc audit ids into
auditSet
+ if (StringUtils.isNotBlank(sourceNodeType)) {
+ if (CollectionUtils.isNotEmpty(cdcAuditIds)) {
+ auditIdSet.addAll(cdcAuditIds);
+ }
+ // add source received audit id into auditSet
+ auditIdSet.add(getAuditId(sourceNodeType,
RECEIVED_SUCCESS));
}
- auditSet.add(getAuditId(sourceNodeType,
IndicatorType.RECEIVED_SUCCESS));
} else {
- auditSet.add(getAuditId(sinkNodeType,
IndicatorType.RECEIVED_SUCCESS));
+ auditIdSet.add(getAuditId(sinkNodeType, RECEIVED_SUCCESS));
}
}
// auto push source has no agent, return data-proxy audit data instead
of agent
- List<StreamSourceEntity> sourceList =
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isEmpty(sourceList)
|| sourceList.stream().allMatch(s ->
SourceType.AUTO_PUSH.equals(s.getSourceType()))) {
// need data_proxy received type when agent has received type
- boolean dpReceivedNeeded =
auditSet.contains(getAuditId(ClusterType.AGENT,
IndicatorType.RECEIVED_SUCCESS));
+ boolean dpReceivedNeeded =
auditIdSet.contains(getAuditId(ClusterType.AGENT, RECEIVED_SUCCESS));
if (dpReceivedNeeded) {
- auditSet.add(getAuditId(ClusterType.DATAPROXY,
IndicatorType.RECEIVED_SUCCESS));
+ auditIdSet.add(getAuditId(ClusterType.DATAPROXY,
RECEIVED_SUCCESS));
}
}
- return new ArrayList<>(auditSet);
+ return new ArrayList<>(auditIdSet);
}
@Override
public List<AuditProxy> getAuditProxy(String component) throws Exception {
try {
- StringBuilder builder = new StringBuilder();
- builder.append(auditQueryUrl)
- .append("/audit/query/getAuditProxy?")
- .append("component=")
- .append(component);
- String url = builder.toString();
+ String url = auditQueryUrl
+ + "/audit/query/getAuditProxy?"
+ + "component=" + component;
LOGGER.info("query audit url ={}", url);
AuditProxyResponse result = HttpUtils.request(restTemplate,
url,
diff --git
a/inlong-manager/manager-web/src/main/resources/application.properties
b/inlong-manager/manager-web/src/main/resources/application.properties
index 07a1e53806..8042e37f06 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -63,6 +63,7 @@ openapi.auth.enabled=false
# Audit view by role, see audit id definitions:
https://inlong.apache.org/docs/modules/audit/overview#audit-id
audit.admin.ids=3,4,5,6
audit.user.ids=3,4,5,6
+audit.query.queryTimeoutSeconds=30
# Pulsar message query thread pool configuration
pulsar.query.poolSize=10