This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d565cf430 [INLONG-9523][Manager] Support querying all audit 
information based on IP address (#9537)
6d565cf430 is described below

commit 6d565cf4301c54cdaaf579a3d38154c6ac15baf0
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jan 3 10:43:00 2024 +0800

    [INLONG-9523][Manager] Support querying all audit information based on IP 
address (#9537)
---
 .../client/api/inner/client/AuditClient.java       |  17 +++
 .../manager/client/api/service/AuditApi.java       |   3 +
 .../manager/dao/mapper/AuditEntityMapper.java      |  22 ++--
 .../main/resources/mappers/AuditEntityMapper.xml   |  50 +++++++--
 .../inlong/manager/pojo/audit/AuditInfo.java       |   3 +
 .../inlong/manager/service/core/AuditService.java  |   8 ++
 .../service/core/impl/AuditServiceImpl.java        | 118 ++++++++++++++++++++-
 7 files changed, 200 insertions(+), 21 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
index 8fab6f371b..90355d6493 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/AuditClient.java
@@ -56,6 +56,23 @@ public class AuditClient {
         return response.getData();
     }
 
+    /**
+     * Query audit data for list by condition
+     *
+     * @param request The audit request of query condition
+     * @return The result of query
+     */
+    public List<AuditVO> listAll(AuditRequest request) {
+        Preconditions.expectNotNull(request, "request cannot be null");
+        Preconditions.expectNotBlank(request.getInlongGroupId(), 
ErrorCodeEnum.INVALID_PARAMETER,
+                "inlong group id cannot be empty");
+        Preconditions.expectNotBlank(request.getInlongStreamId(), 
ErrorCodeEnum.INVALID_PARAMETER,
+                "inlong stream id cannot be empty");
+        Response<List<AuditVO>> response = 
ClientUtils.executeHttpCall(auditApi.listAll(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Refresh the base item of audit cache.
      *
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
index ab12f7608f..37fd3d60c8 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/AuditApi.java
@@ -32,6 +32,9 @@ public interface AuditApi {
     @POST("audit/list")
     Call<Response<List<AuditVO>>> list(@Body AuditRequest auditRequest);
 
+    @POST("audit/listAll")
+    Call<Response<List<AuditVO>>> listAll(@Body AuditRequest auditRequest);
+
     @POST("audit/refreshCache")
     Call<Response<Boolean>> refreshCache();
 
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
index 62330e6025..6f1a31dd9a 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
@@ -32,16 +32,16 @@ public interface AuditEntityMapper {
      * @param groupId The groupId of inlong
      * @param streamId The streamId of inlong
      * @param auditId The auditId of inlong
-     * @param sDate The start date
-     * @param eDate The end date
+     * @param startDate The start date
+     * @param endData The end date
      * @param format The format such as '%Y-%m-%d %H:%i:00'
      * @return The result of query
      */
     List<Map<String, Object>> sumByLogTs(@Param(value = "groupId") String 
groupId,
             @Param(value = "streamId") String streamId,
             @Param(value = "auditId") String auditId,
-            @Param(value = "sDate") String sDate,
-            @Param(value = "eDate") String eDate,
+            @Param(value = "startDate") String startDate,
+            @Param(value = "endData") String endData,
             @Param(value = "format") String format);
 
     /**
@@ -49,14 +49,20 @@ public interface AuditEntityMapper {
      *
      * @param ip ip
      * @param auditId The auditId of inlong
-     * @param sDate The start date
-     * @param eDate The end date
+     * @param startDate The start date
+     * @param endData The end date
      * @param format The format such as '%Y-%m-%d %H:%i:00'
      * @return The result of query
      */
     List<Map<String, Object>> sumByLogTsAndIp(@Param(value = "ip") String ip,
             @Param(value = "auditId") String auditId,
-            @Param(value = "sDate") String sDate,
-            @Param(value = "eDate") String eDate,
+            @Param(value = "startDate") String startDate,
+            @Param(value = "endData") String endData,
             @Param(value = "format") String format);
+
+    List<Map<String, Object>> sumGroupByIp(@Param(value = "groupId") String 
groupId,
+            @Param(value = "streamId") String streamId,
+            @Param(value = "auditId") String auditId,
+            @Param(value = "startDate") String startDate,
+            @Param(value = "endData") String endData);
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index e2090204ee..63d6af6c73 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -41,20 +41,28 @@
         <result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
         <result column="total" property="total" jdbcType="BIGINT"/>
         <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
-        <result column="total_size" property="totalSize" jdbcType="BIGINT"/>
+    </resultMap>
+
+    <resultMap id="SumGroupByIdResultMap" type="java.util.Map">
+        <result column="inlong_group_id" property="inlongGroupId" 
jdbcType="VARCHAR"/>
+        <result column="inlong_stream_id" property="inlongStreamId" 
jdbcType="VARCHAR"/>
+        <result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
+        <result column="ip" property="ip" jdbcType="VARCHAR"/>
+        <result column="total" property="total" jdbcType="BIGINT"/>
+        <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
     </resultMap>
 
     <select id="sumByLogTs" resultMap="SumByLogTsResultMap">
         select inlong_group_id, inlong_stream_id, date_format(log_ts, 
#{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as 
total_delay, sum(`size`) as total_size
         from (
-            select distinct ip, docker_id, thread_id, sdk_ts, packet_id, 
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
-            from apache_inlong_audit.audit_data
-            where inlong_group_id = #{groupId,jdbcType=VARCHAR}
-              and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
-              and audit_id = #{auditId,jdbcType=VARCHAR}
-              and log_ts &gt;= #{sDate, jdbcType=VARCHAR}
-              and log_ts &lt; #{eDate, jdbcType=VARCHAR}
-            ) as sub
+                 select distinct ip, docker_id, thread_id, sdk_ts, packet_id, 
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
+                 from apache_inlong_audit.audit_data
+                 where inlong_group_id = #{groupId,jdbcType=VARCHAR}
+                   and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
+                   and audit_id = #{auditId,jdbcType=VARCHAR}
+                   and log_ts &gt;= #{startDate, jdbcType=VARCHAR}
+                   and log_ts &lt; #{endData, jdbcType=VARCHAR}
+             ) as sub
         group by log_ts, inlong_group_id, inlong_stream_id
         order by log_ts
     </select>
@@ -66,10 +74,30 @@
                  from apache_inlong_audit.audit_data
                  where ip = #{ip,jdbcType=VARCHAR}
                    and audit_id = #{auditId,jdbcType=VARCHAR}
-                   and log_ts &gt;= #{sDate, jdbcType=VARCHAR}
-                   and log_ts &lt; #{eDate, jdbcType=VARCHAR}
+                   and log_ts &gt;= #{startDate, jdbcType=VARCHAR}
+                   and log_ts &lt; #{endData, jdbcType=VARCHAR}
              ) as sub
         group by log_ts, inlong_group_id, inlong_stream_id
         order by log_ts
     </select>
+    <select id="sumGroupByIp" resultMap="SumGroupByIdResultMap">
+        select inlong_group_id, inlong_stream_id, ip, sum(`count`) as total, 
sum(`delay`) as total_delay, sum(`size`) as total_size
+        from (
+                 select distinct ip, docker_id, thread_id, sdk_ts, packet_id, 
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
+                 from apache_inlong_audit.audit_data
+                 <where>
+                     audit_id = #{auditId,jdbcType=VARCHAR}
+                     and log_ts &gt;= #{startDate, jdbcType=VARCHAR}
+                     and log_ts &lt; #{endData, jdbcType=VARCHAR}
+                     <if test="ip != null and ip != ''">
+                         and ip = #{ip,jdbcType=VARCHAR}
+                     </if>
+                     <if test="ip == null or ip == ''">
+                         and inlong_group_id = #{groupId,jdbcType=VARCHAR}
+                         and inlong_stream_id = #{streamId,jdbcType=VARCHAR}
+                     </if>
+                 </where>
+             ) as sub
+        group by inlong_group_id, inlong_stream_id, ip
+    </select>
 </mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
index 8b15edb3ac..71e74d4c9a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
@@ -32,6 +32,9 @@ public class AuditInfo {
     @ApiModelProperty(value = "inlong stream id")
     private String inlongStreamId;
 
+    @ApiModelProperty(value = "ip")
+    private String ip;
+
     @ApiModelProperty(value = "Audit log timestamp")
     private String logTs;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index e37277aefb..08991917b6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -38,6 +38,14 @@ public interface AuditService {
      */
     List<AuditVO> listByCondition(AuditRequest request) throws Exception;
 
+    /**
+     * Query audit data for list by condition
+     *
+     * @param request The audit request of query condition
+     * @return The result of query
+     */
+    List<AuditVO> listAll(AuditRequest request) throws Exception;
+
     List<AuditBaseResponse> getAuditBases();
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index f8ee0f618e..264911fb60 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -333,7 +333,7 @@ public class AuditServiceImpl implements AuditService {
                 }
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
                 try (Connection connection = config.getCkConnection();
-                        PreparedStatement statement = 
getAuditCkStatement(connection, groupId, streamId,
+                        PreparedStatement statement = 
getAuditCkStatementGroupByLogTs(connection, groupId, streamId,
                                 request.getIp(), auditId,
                                 request.getStartDate(),
                                 request.getEndDate());
@@ -358,6 +358,60 @@ public class AuditServiceImpl implements AuditService {
         return aggregateByTimeDim(result, request.getTimeStaticsDim());
     }
 
+    @Override
+    public List<AuditVO> listAll(AuditRequest request) throws Exception {
+        List<AuditVO> result = new ArrayList<>();
+        AuditQuerySource querySource = 
AuditQuerySource.valueOf(auditQuerySource);
+        for (String auditId : request.getAuditIds()) {
+            AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
+            String auditName = "";
+            if (auditBaseEntity != null) {
+                auditName = auditBaseEntity.getName();
+            }
+            if (AuditQuerySource.MYSQL == querySource) {
+                // Support min agg at now
+                DateTime endDate = 
DAY_DATE_FORMATTER.parseDateTime(request.getEndDate());
+                String endDateStr = 
endDate.plusDays(1).toString(DAY_DATE_FORMATTER);
+                List<Map<String, Object>> sumList = 
auditEntityMapper.sumGroupByIp(
+                        request.getInlongGroupId(), 
request.getInlongStreamId(), auditId, request.getStartDate(),
+                        endDateStr);
+                List<AuditInfo> auditSet = sumList.stream().map(s -> {
+                    AuditInfo vo = new AuditInfo();
+                    vo.setInlongGroupId((String) s.get("inlongGroupId"));
+                    vo.setInlongStreamId((String) s.get("inlongStreamId"));
+                    vo.setLogTs((String) s.get("logTs"));
+                    vo.setIp((String) s.get("ip"));
+                    vo.setCount(((BigDecimal) s.get("total")).longValue());
+                    vo.setDelay(((BigDecimal) 
s.get("totalDelay")).longValue());
+                    vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
+                    return vo;
+                }).collect(Collectors.toList());
+                result.add(new AuditVO(auditId, auditName, auditSet, null));
+            } else if (AuditQuerySource.CLICKHOUSE == querySource) {
+                try (Connection connection = config.getCkConnection();
+                        PreparedStatement statement = 
getAuditCkStatementGroupByIp(connection,
+                                request.getInlongGroupId(), 
request.getInlongStreamId(), request.getIp(), auditId,
+                                request.getStartDate(), request.getEndDate());
+
+                        ResultSet resultSet = statement.executeQuery()) {
+                    List<AuditInfo> auditSet = new ArrayList<>();
+                    while (resultSet.next()) {
+                        AuditInfo vo = new AuditInfo();
+                        
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
+                        
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
+                        vo.setIp(resultSet.getString("ip"));
+                        vo.setCount(resultSet.getLong("total"));
+                        vo.setDelay(resultSet.getLong("total_delay"));
+                        vo.setSize(resultSet.getLong("total_size"));
+                        auditSet.add(vo);
+                    }
+                    result.add(new AuditVO(auditId, auditName, auditSet, 
null));
+                }
+            }
+        }
+        return result;
+    }
+
     @Override
     public List<AuditBaseResponse> getAuditBases() {
         List<AuditBaseEntity> auditBaseEntityList = 
auditBaseMapper.selectAll();
@@ -430,7 +484,8 @@ public class AuditServiceImpl implements AuditService {
      * @param endDate The en datetime of request
      * @return The clickhouse Statement
      */
-    private PreparedStatement getAuditCkStatement(Connection connection, 
String groupId, String streamId, String ip,
+    private PreparedStatement getAuditCkStatementGroupByLogTs(Connection 
connection, String groupId, String streamId,
+            String ip,
             String auditId, String startDate, String endDate) throws 
SQLException {
         String start = 
DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
         String end = 
DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
@@ -466,6 +521,39 @@ public class AuditServiceImpl implements AuditService {
         return statement;
     }
 
+    private PreparedStatement getAuditCkStatementGroupByIp(Connection 
connection, String groupId,
+            String streamId, String ip, String auditId, String startDate, 
String endDate) throws SQLException {
+
+        if (StringUtils.isNotBlank(ip)) {
+            return getAuditCkStatementByIpGroupByIp(connection, auditId, ip, 
startDate, endDate);
+        }
+        // Query results are duplicated according to all fields.
+        String subQuery = new SQL()
+                .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", 
"packet_id", "log_ts", "inlong_group_id",
+                        "inlong_stream_id", "audit_id", "count", "size", 
"delay")
+                .FROM("audit_data")
+                .WHERE("inlong_group_id = ?")
+                .WHERE("inlong_stream_id = ?")
+                .WHERE("audit_id = ?")
+                .WHERE("log_ts >= ?")
+                .WHERE("log_ts < ?")
+                .toString();
+
+        String sql = new SQL()
+                .SELECT("inlong_group_id", "inlong_stream_id", "sum(count) as 
total", "ip",
+                        "sum(delay) as total_delay", "sum(size) as total_size")
+                .FROM("(" + subQuery + ") as sub")
+                .GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
+                .toString();
+        PreparedStatement statement = connection.prepareStatement(sql);
+        statement.setString(1, groupId);
+        statement.setString(2, streamId);
+        statement.setString(3, auditId);
+        statement.setString(4, startDate);
+        statement.setString(5, endDate);
+        return statement;
+    }
+
     /**
      * Aggregate by time dim
      */
@@ -578,4 +666,30 @@ public class AuditServiceImpl implements AuditService {
         return statement;
     }
 
+    private PreparedStatement getAuditCkStatementByIpGroupByIp(Connection 
connection, String auditId, String ip,
+            String startDate, String endDate) throws SQLException {
+        String subQuery = new SQL()
+                .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", 
"packet_id", "log_ts", "inlong_group_id",
+                        "inlong_stream_id", "audit_id", "count", "size", 
"delay")
+                .FROM("audit_data")
+                .WHERE("ip = ?")
+                .WHERE("audit_id = ?")
+                .WHERE("log_ts >= ?")
+                .WHERE("log_ts < ?")
+                .toString();
+
+        String sql = new SQL()
+                .SELECT("inlong_group_id", "inlong_stream_id", "ip", 
"sum(count) as total",
+                        "sum(delay) as total_delay", "sum(size) as total_size")
+                .FROM("(" + subQuery + ") as sub")
+                .GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
+                .toString();
+        PreparedStatement statement = connection.prepareStatement(sql);
+        statement.setString(1, ip);
+        statement.setString(2, auditId);
+        statement.setString(3, startDate);
+        statement.setString(4, endDate);
+        return statement;
+    }
+
 }

Reply via email to