This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6d565cf430 [INLONG-9523][Manager] Support querying all audit
information based on IP address (#9537)
6d565cf430 is described below
commit 6d565cf4301c54cdaaf579a3d38154c6ac15baf0
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jan 3 10:43:00 2024 +0800
[INLONG-9523][Manager] Support querying all audit information based on IP
address (#9537)
---
.../client/api/inner/client/AuditClient.java | 17 +++
.../manager/client/api/service/AuditApi.java | 3 +
.../manager/dao/mapper/AuditEntityMapper.java | 22 ++--
.../main/resources/mappers/AuditEntityMapper.xml | 50 +++++++--
.../inlong/manager/pojo/audit/AuditInfo.java | 3 +
.../inlong/manager/service/core/AuditService.java | 8 ++
.../service/core/impl/AuditServiceImpl.java | 118 ++++++++++++++++++++-
7 files changed, 200 insertions(+), 21 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
index 8fab6f371b..90355d6493 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
@@ -56,6 +56,23 @@ public class AuditClient {
return response.getData();
}
+ /**
+ * Query audit data for list by condition
+ *
+ * @param request The audit request of query condition
+ * @return The result of query
+ */
+ public List<AuditVO> listAll(AuditRequest request) {
+ Preconditions.expectNotNull(request, "request cannot be null");
+ Preconditions.expectNotBlank(request.getInlongGroupId(),
ErrorCodeEnum.INVALID_PARAMETER,
+ "inlong group id cannot be empty");
+ Preconditions.expectNotBlank(request.getInlongStreamId(),
ErrorCodeEnum.INVALID_PARAMETER,
+ "inlong stream id cannot be empty");
+ Response<List<AuditVO>> response =
ClientUtils.executeHttpCall(auditApi.listAll(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Refresh the base item of audit cache.
*
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
index ab12f7608f..37fd3d60c8 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
@@ -32,6 +32,9 @@ public interface AuditApi {
@POST("audit/list")
Call<Response<List<AuditVO>>> list(@Body AuditRequest auditRequest);
+ @POST("audit/listAll")
+ Call<Response<List<AuditVO>>> listAll(@Body AuditRequest auditRequest);
+
@POST("audit/refreshCache")
Call<Response<Boolean>> refreshCache();
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
index 62330e6025..6f1a31dd9a 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
@@ -32,16 +32,16 @@ public interface AuditEntityMapper {
* @param groupId The groupId of inlong
* @param streamId The streamId of inlong
* @param auditId The auditId of inlong
- * @param sDate The start date
- * @param eDate The end date
+ * @param startDate The start date
+ * @param endData The end date
* @param format The format such as '%Y-%m-%d %H:%i:00'
* @return The result of query
*/
List<Map<String, Object>> sumByLogTs(@Param(value = "groupId") String
groupId,
@Param(value = "streamId") String streamId,
@Param(value = "auditId") String auditId,
- @Param(value = "sDate") String sDate,
- @Param(value = "eDate") String eDate,
+ @Param(value = "startDate") String startDate,
+ @Param(value = "endData") String endData,
@Param(value = "format") String format);
/**
@@ -49,14 +49,20 @@ public interface AuditEntityMapper {
*
* @param ip ip
* @param auditId The auditId of inlong
- * @param sDate The start date
- * @param eDate The end date
+ * @param startDate The start date
+ * @param endData The end date
* @param format The format such as '%Y-%m-%d %H:%i:00'
* @return The result of query
*/
List<Map<String, Object>> sumByLogTsAndIp(@Param(value = "ip") String ip,
@Param(value = "auditId") String auditId,
- @Param(value = "sDate") String sDate,
- @Param(value = "eDate") String eDate,
+ @Param(value = "startDate") String startDate,
+ @Param(value = "endData") String endData,
@Param(value = "format") String format);
+
+ List<Map<String, Object>> sumGroupByIp(@Param(value = "groupId") String
groupId,
+ @Param(value = "streamId") String streamId,
+ @Param(value = "auditId") String auditId,
+ @Param(value = "startDate") String startDate,
+ @Param(value = "endData") String endData);
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index e2090204ee..63d6af6c73 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -41,20 +41,28 @@
<result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
- <result column="total_size" property="totalSize" jdbcType="BIGINT"/>
+ </resultMap>
+
+ <resultMap id="SumGroupByIdResultMap" type="java.util.Map">
+ <result column="inlong_group_id" property="inlongGroupId"
jdbcType="VARCHAR"/>
+ <result column="inlong_stream_id" property="inlongStreamId"
jdbcType="VARCHAR"/>
+ <result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
+ <result column="ip" property="ip" jdbcType="VARCHAR"/>
+ <result column="total" property="total" jdbcType="BIGINT"/>
+ <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
</resultMap>
<select id="sumByLogTs" resultMap="SumByLogTsResultMap">
select inlong_group_id, inlong_stream_id, date_format(log_ts,
#{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as
total_delay, sum(`size`) as total_size
from (
- select distinct ip, docker_id, thread_id, sdk_ts, packet_id,
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
- from apache_inlong_audit.audit_data
- where inlong_group_id = #{groupId,jdbcType=VARCHAR}
- and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
- and audit_id = #{auditId,jdbcType=VARCHAR}
- and log_ts >= #{sDate, jdbcType=VARCHAR}
- and log_ts < #{eDate, jdbcType=VARCHAR}
- ) as sub
+ select distinct ip, docker_id, thread_id, sdk_ts, packet_id,
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
+ from apache_inlong_audit.audit_data
+ where inlong_group_id = #{groupId,jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
+ and audit_id = #{auditId,jdbcType=VARCHAR}
+ and log_ts >= #{startDate, jdbcType=VARCHAR}
+ and log_ts < #{endData, jdbcType=VARCHAR}
+ ) as sub
group by log_ts, inlong_group_id, inlong_stream_id
order by log_ts
</select>
@@ -66,10 +74,30 @@
from apache_inlong_audit.audit_data
where ip = #{ip,jdbcType=VARCHAR}
and audit_id = #{auditId,jdbcType=VARCHAR}
- and log_ts >= #{sDate, jdbcType=VARCHAR}
- and log_ts < #{eDate, jdbcType=VARCHAR}
+ and log_ts >= #{startDate, jdbcType=VARCHAR}
+ and log_ts < #{endData, jdbcType=VARCHAR}
) as sub
group by log_ts, inlong_group_id, inlong_stream_id
order by log_ts
</select>
+ <select id="sumGroupByIp" resultMap="SumGroupByIdResultMap">
+ select inlong_group_id, inlong_stream_id, ip, sum(`count`) as total,
sum(`delay`) as total_delay, sum(`size`) as total_size
+ from (
+ select distinct ip, docker_id, thread_id, sdk_ts, packet_id,
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
+ from apache_inlong_audit.audit_data
+ <where>
+ audit_id = #{auditId,jdbcType=VARCHAR}
+ and log_ts >= #{startDate, jdbcType=VARCHAR}
+ and log_ts < #{endData, jdbcType=VARCHAR}
+ <if test="ip != null and ip != ''">
+ and ip = #{ip,jdbcType=VARCHAR}
+ </if>
+ <if test="ip == null or ip == ''">
+ and inlong_group_id = #{groupId,jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
+ </if>
+ </where>
+ ) as sub
+ group by inlong_group_id, inlong_stream_id, ip
+ </select>
</mapper>
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
index 8b15edb3ac..71e74d4c9a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
@@ -32,6 +32,9 @@ public class AuditInfo {
@ApiModelProperty(value = "inlong stream id")
private String inlongStreamId;
+ @ApiModelProperty(value = "ip")
+ private String ip;
+
@ApiModelProperty(value = "Audit log timestamp")
private String logTs;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index e37277aefb..08991917b6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -38,6 +38,14 @@ public interface AuditService {
*/
List<AuditVO> listByCondition(AuditRequest request) throws Exception;
+ /**
+ * Query audit data for list by condition
+ *
+ * @param request The audit request of query condition
+ * @return The result of query
+ */
+ List<AuditVO> listAll(AuditRequest request) throws Exception;
+
List<AuditBaseResponse> getAuditBases();
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index f8ee0f618e..264911fb60 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -333,7 +333,7 @@ public class AuditServiceImpl implements AuditService {
}
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
try (Connection connection = config.getCkConnection();
- PreparedStatement statement =
getAuditCkStatement(connection, groupId, streamId,
+ PreparedStatement statement =
getAuditCkStatementGroupByLogTs(connection, groupId, streamId,
request.getIp(), auditId,
request.getStartDate(),
request.getEndDate());
@@ -358,6 +358,60 @@ public class AuditServiceImpl implements AuditService {
return aggregateByTimeDim(result, request.getTimeStaticsDim());
}
+ @Override
+ public List<AuditVO> listAll(AuditRequest request) throws Exception {
+ List<AuditVO> result = new ArrayList<>();
+ AuditQuerySource querySource =
AuditQuerySource.valueOf(auditQuerySource);
+ for (String auditId : request.getAuditIds()) {
+ AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
+ String auditName = "";
+ if (auditBaseEntity != null) {
+ auditName = auditBaseEntity.getName();
+ }
+ if (AuditQuerySource.MYSQL == querySource) {
+ // Support min agg at now
+ DateTime endDate =
DAY_DATE_FORMATTER.parseDateTime(request.getEndDate());
+ String endDateStr =
endDate.plusDays(1).toString(DAY_DATE_FORMATTER);
+ List<Map<String, Object>> sumList =
auditEntityMapper.sumGroupByIp(
+ request.getInlongGroupId(),
request.getInlongStreamId(), auditId, request.getStartDate(),
+ endDateStr);
+ List<AuditInfo> auditSet = sumList.stream().map(s -> {
+ AuditInfo vo = new AuditInfo();
+ vo.setInlongGroupId((String) s.get("inlongGroupId"));
+ vo.setInlongStreamId((String) s.get("inlongStreamId"));
+ vo.setLogTs((String) s.get("logTs"));
+ vo.setIp((String) s.get("ip"));
+ vo.setCount(((BigDecimal) s.get("total")).longValue());
+ vo.setDelay(((BigDecimal)
s.get("totalDelay")).longValue());
+ vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
+ return vo;
+ }).collect(Collectors.toList());
+ result.add(new AuditVO(auditId, auditName, auditSet, null));
+ } else if (AuditQuerySource.CLICKHOUSE == querySource) {
+ try (Connection connection = config.getCkConnection();
+ PreparedStatement statement =
getAuditCkStatementGroupByIp(connection,
+ request.getInlongGroupId(),
request.getInlongStreamId(), request.getIp(), auditId,
+ request.getStartDate(), request.getEndDate());
+
+ ResultSet resultSet = statement.executeQuery()) {
+ List<AuditInfo> auditSet = new ArrayList<>();
+ while (resultSet.next()) {
+ AuditInfo vo = new AuditInfo();
+
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
+
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
+ vo.setIp(resultSet.getString("ip"));
+ vo.setCount(resultSet.getLong("total"));
+ vo.setDelay(resultSet.getLong("total_delay"));
+ vo.setSize(resultSet.getLong("total_size"));
+ auditSet.add(vo);
+ }
+ result.add(new AuditVO(auditId, auditName, auditSet,
null));
+ }
+ }
+ }
+ return result;
+ }
+
@Override
public List<AuditBaseResponse> getAuditBases() {
List<AuditBaseEntity> auditBaseEntityList =
auditBaseMapper.selectAll();
@@ -430,7 +484,8 @@ public class AuditServiceImpl implements AuditService {
* @param endDate The en datetime of request
* @return The clickhouse Statement
*/
- private PreparedStatement getAuditCkStatement(Connection connection,
String groupId, String streamId, String ip,
+ private PreparedStatement getAuditCkStatementGroupByLogTs(Connection
connection, String groupId, String streamId,
+ String ip,
String auditId, String startDate, String endDate) throws
SQLException {
String start =
DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
String end =
DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
@@ -466,6 +521,39 @@ public class AuditServiceImpl implements AuditService {
return statement;
}
+ private PreparedStatement getAuditCkStatementGroupByIp(Connection
connection, String groupId,
+ String streamId, String ip, String auditId, String startDate,
String endDate) throws SQLException {
+
+ if (StringUtils.isNotBlank(ip)) {
+ return getAuditCkStatementByIpGroupByIp(connection, auditId, ip,
startDate, endDate);
+ }
+ // Query results are duplicated according to all fields.
+ String subQuery = new SQL()
+ .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts",
"packet_id", "log_ts", "inlong_group_id",
+ "inlong_stream_id", "audit_id", "count", "size",
"delay")
+ .FROM("audit_data")
+ .WHERE("inlong_group_id = ?")
+ .WHERE("inlong_stream_id = ?")
+ .WHERE("audit_id = ?")
+ .WHERE("log_ts >= ?")
+ .WHERE("log_ts < ?")
+ .toString();
+
+ String sql = new SQL()
+ .SELECT("inlong_group_id", "inlong_stream_id", "sum(count) as
total", "ip",
+ "sum(delay) as total_delay", "sum(size) as total_size")
+ .FROM("(" + subQuery + ") as sub")
+ .GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
+ .toString();
+ PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, groupId);
+ statement.setString(2, streamId);
+ statement.setString(3, auditId);
+ statement.setString(4, startDate);
+ statement.setString(5, endDate);
+ return statement;
+ }
+
/**
* Aggregate by time dim
*/
@@ -578,4 +666,30 @@ public class AuditServiceImpl implements AuditService {
return statement;
}
+ private PreparedStatement getAuditCkStatementByIpGroupByIp(Connection
connection, String auditId, String ip,
+ String startDate, String endDate) throws SQLException {
+ String subQuery = new SQL()
+ .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts",
"packet_id", "log_ts", "inlong_group_id",
+ "inlong_stream_id", "audit_id", "count", "size",
"delay")
+ .FROM("audit_data")
+ .WHERE("ip = ?")
+ .WHERE("audit_id = ?")
+ .WHERE("log_ts >= ?")
+ .WHERE("log_ts < ?")
+ .toString();
+
+ String sql = new SQL()
+ .SELECT("inlong_group_id", "inlong_stream_id", "ip",
"sum(count) as total",
+ "sum(delay) as total_delay", "sum(size) as total_size")
+ .FROM("(" + subQuery + ") as sub")
+ .GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
+ .toString();
+ PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, ip);
+ statement.setString(2, auditId);
+ statement.setString(3, startDate);
+ statement.setString(4, endDate);
+ return statement;
+ }
+
}