This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e34bf1fde6 [INLONG-9435][Manager] Support querying audit data by audit
ID and obtaining audit ID information (#9436)
e34bf1fde6 is described below
commit e34bf1fde63eb0ad340f03847476d95ba4e2c1da
Author: fuweng11 <[email protected]>
AuthorDate: Thu Dec 7 13:05:39 2023 +0800
[INLONG-9435][Manager] Support querying audit data by audit ID and
obtaining audit ID information (#9436)
---
.../main/resources/mappers/AuditEntityMapper.xml | 6 +-
.../audit/{AuditVO.java => AuditBaseResponse.java} | 32 +++++------
.../inlong/manager/pojo/audit/AuditInfo.java | 9 +++
.../inlong/manager/pojo/audit/AuditRequest.java | 10 +---
.../apache/inlong/manager/pojo/audit/AuditVO.java | 5 +-
.../inlong/manager/service/core/AuditService.java | 3 +
.../service/core/impl/AuditServiceImpl.java | 65 +++++++++++++++-------
.../manager/web/controller/AuditController.java | 7 +++
8 files changed, 90 insertions(+), 47 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index dd582d0d7d..cca03f50ef 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -36,6 +36,8 @@
</resultMap>
<resultMap id="SumByLogTsResultMap" type="java.util.Map">
+ <result column="inlong_group_id" property="inlongGroupId"
jdbcType="VARCHAR"/>
+ <result column="inlong_stream_id" property="inlongStreamId"
jdbcType="VARCHAR"/>
<result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
@@ -43,7 +45,7 @@
</resultMap>
<select id="sumByLogTs" resultMap="SumByLogTsResultMap">
- select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts,
sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size
+ select inlong_group_id, inlong_stream_id, date_format(log_ts,
#{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as
total_delay, sum(`size`) as total_size
from (
select distinct ip, docker_id, thread_id, sdk_ts, packet_id,
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
from apache_inlong_audit.audit_data
@@ -53,7 +55,7 @@
and log_ts >= #{sDate, jdbcType=VARCHAR}
and log_ts < #{eDate, jdbcType=VARCHAR}
) as sub
- group by log_ts
+ group by log_ts, inlong_group_id, inlong_stream_id
order by log_ts
</select>
</mapper>
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java
similarity index 69%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java
index 68b8299c58..74da6aa46a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java
@@ -20,27 +20,25 @@ package org.apache.inlong.manager.pojo.audit;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import java.util.List;
-
/**
- * The VO of audit.
+ * Audit base info
*/
@Data
-public class AuditVO {
+public class AuditBaseResponse {
+
+ @ApiModelProperty(value = "Audit log timestamp")
+ private Integer id;
+
+ @ApiModelProperty(value = "Audit name")
+ private String name;
+
+ @ApiModelProperty(value = "Audit type")
+ private String type;
+
+ @ApiModelProperty(value = "is sent")
+ private Integer isSent;
@ApiModelProperty(value = "Audit id")
private String auditId;
- @ApiModelProperty(value = "Audit set")
- private List<AuditInfo> auditSet;
- @ApiModelProperty(value = "Node type")
- private String nodeType;
-
- public AuditVO() {
- }
-
- public AuditVO(String auditId, List<AuditInfo> auditSet, String nodeType) {
- this.auditId = auditId;
- this.auditSet = auditSet;
- this.nodeType = nodeType;
- }
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
index a1ac46f9e6..8b15edb3ac 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
@@ -26,12 +26,21 @@ import lombok.Data;
@Data
public class AuditInfo {
+ @ApiModelProperty(value = "inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "inlong stream id")
+ private String inlongStreamId;
+
@ApiModelProperty(value = "Audit log timestamp")
private String logTs;
+
@ApiModelProperty(value = "Audit count")
private long count;
+
@ApiModelProperty(value = "Audit delay")
private long delay;
+
@ApiModelProperty(value = "Audit size")
private long size;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
index 4023d40dc0..980b48c333 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
@@ -24,8 +24,6 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import javax.validation.constraints.NotBlank;
-
import java.util.List;
/**
@@ -36,15 +34,13 @@ import java.util.List;
@ApiModel("Audit query request")
public class AuditRequest {
- @NotBlank(message = "inlongGroupId not be blank")
- @ApiModelProperty(value = "inlong group id", required = true)
+ @ApiModelProperty(value = "inlong group id")
private String inlongGroupId;
- @NotBlank(message = "inlongStreamId not be blank")
- @ApiModelProperty(value = "inlong stream id", required = true)
+ @ApiModelProperty(value = "inlong stream id")
private String inlongStreamId;
- @ApiModelProperty(value = "audit id list", required = true)
+ @ApiModelProperty(value = "audit id list")
private List<String> auditIds;
@ApiModelProperty(value = "sink id")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
index 68b8299c58..fa58ac17ed 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
@@ -30,6 +30,8 @@ public class AuditVO {
@ApiModelProperty(value = "Audit id")
private String auditId;
+ @ApiModelProperty(value = "Audit name")
+ private String auditName;
@ApiModelProperty(value = "Audit set")
private List<AuditInfo> auditSet;
@ApiModelProperty(value = "Node type")
@@ -38,8 +40,9 @@ public class AuditVO {
public AuditVO() {
}
- public AuditVO(String auditId, List<AuditInfo> auditSet, String nodeType) {
+ public AuditVO(String auditId, String auditName, List<AuditInfo> auditSet,
String nodeType) {
this.auditId = auditId;
+ this.auditName = auditName;
this.auditSet = auditSet;
this.nodeType = nodeType;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index 5068883b73..e37277aefb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.core;
+import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
@@ -37,6 +38,8 @@ public interface AuditService {
*/
List<AuditVO> listByCondition(AuditRequest request) throws Exception;
+ List<AuditBaseResponse> getAuditBases();
+
/**
* Get audit id by type and isSent.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 7ffdafb3b9..24478a3f39 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -37,6 +37,7 @@ import
org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
@@ -113,6 +114,8 @@ public class AuditServiceImpl implements AuditService {
private final Map<String, AuditBaseEntity> auditReceivedItemMap = new
ConcurrentHashMap<>();
+ private final Map<String, AuditBaseEntity> auditItemMap = new
ConcurrentHashMap<>();
+
// defaults to return all audit ids, can be overwritten in properties file
// see audit id definitions:
https://inlong.apache.org/docs/modules/audit/overview#audit-id
@Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
@@ -156,6 +159,7 @@ public class AuditServiceImpl implements AuditService {
try {
List<AuditBaseEntity> auditBaseEntities =
auditBaseMapper.selectAll();
for (AuditBaseEntity auditBaseEntity : auditBaseEntities) {
+ auditItemMap.put(auditBaseEntity.getAuditId(),
auditBaseEntity);
String type = auditBaseEntity.getType();
if (auditBaseEntity.getIsSent() == 1) {
auditSentItemMap.put(type, auditBaseEntity);
@@ -252,30 +256,39 @@ public class AuditServiceImpl implements AuditService {
if (sinkEntity != null) {
sinkNodeType = sinkEntity.getSinkType();
}
+ Map<String, String> auditIdMap = new HashMap<>();
- InlongGroupEntity groupEntity =
inlongGroupMapper.selectByGroupId(groupId);
- List<StreamSourceEntity> sourceEntityList =
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
- if (CollectionUtils.isNotEmpty(sourceEntityList)) {
- sourceNodeType = sourceEntityList.get(0).getSourceType();
- }
+ if (StringUtils.isNotBlank(groupId)) {
+ InlongGroupEntity groupEntity =
inlongGroupMapper.selectByGroupId(groupId);
+ List<StreamSourceEntity> sourceEntityList =
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
+ if (CollectionUtils.isNotEmpty(sourceEntityList)) {
+ sourceNodeType = sourceEntityList.get(0).getSourceType();
+ }
- Map<String, String> auditIdMap = new HashMap<>();
- auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
+ auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
- if (CollectionUtils.isEmpty(request.getAuditIds())) {
- // properly overwrite audit ids by role and stream config
- if
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
- auditIdMap.put(getAuditId(sourceNodeType, false),
sourceNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId,
sourceNodeType, sinkNodeType));
- } else {
- auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId, null,
sinkNodeType));
+ if (CollectionUtils.isEmpty(request.getAuditIds())) {
+ // properly overwrite audit ids by role and stream config
+ if
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+ auditIdMap.put(getAuditId(sourceNodeType, false),
sourceNodeType);
+ request.setAuditIds(getAuditIds(groupId, streamId,
sourceNodeType, sinkNodeType));
+ } else {
+ auditIdMap.put(getAuditId(sinkNodeType, false),
sinkNodeType);
+ request.setAuditIds(getAuditIds(groupId, streamId, null,
sinkNodeType));
+ }
}
+ } else if (CollectionUtils.isEmpty(request.getAuditIds())) {
+ throw new BusinessException("audits id is empty");
}
List<AuditVO> result = new ArrayList<>();
AuditQuerySource querySource =
AuditQuerySource.valueOf(auditQuerySource);
for (String auditId : request.getAuditIds()) {
+ AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
+ String auditName = "";
+ if (auditBaseEntity != null) {
+ auditName = auditBaseEntity.getName();
+ }
if (AuditQuerySource.MYSQL == querySource) {
String format = "%Y-%m-%d %H:%i:00";
// Support min agg at now
@@ -285,13 +298,15 @@ public class AuditServiceImpl implements AuditService {
groupId, streamId, auditId, request.getStartDate(),
endDateStr, format);
List<AuditInfo> auditSet = sumList.stream().map(s -> {
AuditInfo vo = new AuditInfo();
+ vo.setInlongGroupId((String) s.get("inlongGroupId"));
+ vo.setInlongStreamId((String) s.get("inlongStreamId"));
vo.setLogTs((String) s.get("logTs"));
vo.setCount(((BigDecimal) s.get("total")).longValue());
vo.setDelay(((BigDecimal)
s.get("totalDelay")).longValue());
vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
return vo;
}).collect(Collectors.toList());
- result.add(new AuditVO(auditId, auditSet,
auditIdMap.getOrDefault(auditId, null)));
+ result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
} else if (AuditQuerySource.ELASTICSEARCH == querySource) {
String index = String.format("%s_%s",
request.getStartDate().replaceAll("-", ""), auditId);
if (!elasticsearchApi.indexExists(index)) {
@@ -310,7 +325,7 @@ public class AuditServiceImpl implements AuditService {
vo.setDelay((long) ((ParsedSum)
bucket.getAggregations().asList().get(1)).getValue());
return vo;
}).collect(Collectors.toList());
- result.add(new AuditVO(auditId, auditSet,
auditIdMap.getOrDefault(auditId, null)));
+ result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
}
}
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
@@ -322,13 +337,15 @@ public class AuditServiceImpl implements AuditService {
List<AuditInfo> auditSet = new ArrayList<>();
while (resultSet.next()) {
AuditInfo vo = new AuditInfo();
+
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
+
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
vo.setLogTs(resultSet.getString("log_ts"));
vo.setCount(resultSet.getLong("total"));
vo.setDelay(resultSet.getLong("total_delay"));
vo.setSize(resultSet.getLong("total_size"));
auditSet.add(vo);
}
- result.add(new AuditVO(auditId, auditSet,
auditIdMap.getOrDefault(auditId, null)));
+ result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
}
}
}
@@ -336,6 +353,12 @@ public class AuditServiceImpl implements AuditService {
return aggregateByTimeDim(result, request.getTimeStaticsDim());
}
+ @Override
+ public List<AuditBaseResponse> getAuditBases() {
+ List<AuditBaseEntity> auditBaseEntityList =
auditBaseMapper.selectAll();
+ return CommonBeanUtils.copyListProperties(auditBaseEntityList,
AuditBaseResponse::new);
+ }
+
private List<String> getAuditIds(String groupId, String streamId, String
sourceNodeType, String sinkNodeType) {
Set<String> auditSet =
LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)
? new HashSet<>(auditIdListForAdmin)
@@ -420,9 +443,10 @@ public class AuditServiceImpl implements AuditService {
.toString();
String sql = new SQL()
- .SELECT("log_ts", "sum(count) as total", "sum(delay) as
total_delay", "sum(size) as total_size")
+ .SELECT("inlong_group_id", "inlong_stream_id", "log_ts",
"sum(count) as total",
+ "sum(delay) as total_delay", "sum(size) as total_size")
.FROM("(" + subQuery + ") as sub")
- .GROUP_BY("log_ts")
+ .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id")
.ORDER_BY("log_ts")
.toString();
@@ -465,6 +489,7 @@ public class AuditServiceImpl implements AuditService {
HashMap<String, AtomicLong> delayMap = new HashMap<>();
HashMap<String, AtomicLong> sizeMap = new HashMap<>();
statInfo.setAuditId(auditVO.getAuditId());
+ statInfo.setAuditName(auditVO.getAuditName());
statInfo.setNodeType(auditVO.getNodeType());
for (AuditInfo auditInfo : auditVO.getAuditSet()) {
String statKey = formatLogTime(auditInfo.getLogTs(), format);
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
index a7128085e6..9c8beb0a4b 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.web.controller;
+import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
@@ -69,6 +70,12 @@ public class AuditController {
return Response.success(auditService.updateAuditSource(request,
LoginUserUtils.getLoginUser().getName()));
}
+ @ApiOperation(value = "Get the audit base info")
+ @GetMapping("/audit/getAuditBases")
+ public Response<List<AuditBaseResponse>> getAuditBases() {
+ return Response.success(auditService.getAuditBases());
+ }
+
@ApiOperation(value = "Get the audit source")
@GetMapping("/audit/getSource")
public Response<AuditSourceResponse> getAuditSource() {