This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 26f53726a0 [INLONG-8836][Audit] Add audit_tag information to 
distinguish data sources and data targets  (#8837)
26f53726a0 is described below

commit 26f53726a06dd52e6e4bf1eba810bf295c5cc1f5
Author: doleyzi <[email protected]>
AuthorDate: Tue Sep 5 18:56:17 2023 +0800

    [INLONG-8836][Audit] Add audit_tag information to distinguish data sources 
and data targets  (#8837)
---
 .../apache/inlong/audit/protocol/AuditData.java    |  8 +++++++
 .../audit-common/src/main/proto/AuditApi.proto     |  1 +
 .../inlong/audit/source/ServerMessageHandler.java  |  1 +
 .../org/apache/inlong/audit/AuditOperator.java     | 10 ++++++++-
 .../inlong/audit/db/entities/AuditDataPo.java      |  1 +
 .../inlong/audit/db/entities/ClickHouseDataPo.java |  1 +
 .../apache/inlong/audit/db/entities/ESDataPo.java  |  3 ++-
 .../inlong/audit/service/ClickHouseService.java    | 14 +++++++-----
 .../inlong/audit/service/ElasticsearchService.java |  8 +++++++
 .../apache/inlong/audit/service/MySqlService.java  |  1 +
 inlong-audit/sql/apache_inlong_audit.sql           |  1 +
 .../sql/apache_inlong_audit_clickhouse.sql         |  1 +
 .../clickhouse-changes-1.9.0.sql}                  | 26 ++++------------------
 .../mysql-changes-1.9.0.sql}                       | 26 ++++------------------
 14 files changed, 50 insertions(+), 52 deletions(-)

diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
index ac8d596984..eb64cd00f1 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
@@ -28,6 +28,7 @@ public class AuditData {
     private String inlongGroupId;
     private String inlongStreamId;
     private String auditId;
+    private String auditTag;
     private long count;
     private long size;
     private long delay;
@@ -104,6 +105,13 @@ public class AuditData {
         this.auditId = auditId;
     }
 
+    public String getAuditTag() {
+        return auditTag;
+    }
+
+    public void setAuditTag(String auditTag) {
+        this.auditTag = auditTag;
+    }
     public long getCount() {
         return count;
     }
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto 
b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
index ce89b112b3..44bf881017 100644
--- a/inlong-audit/audit-common/src/main/proto/AuditApi.proto
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -61,6 +61,7 @@ message AuditMessageBody {
   uint64 count = 5;
   uint64 size = 6;
   int64  delay = 7;
+  string audit_tag = 8;
 }
 
 message AuditReply {
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 3dcc53f1f7..4ad53136e0 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -166,6 +166,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
 
             auditData.setLogTs(auditMessageBody.getLogTs());
             auditData.setAuditId(auditMessageBody.getAuditId());
+            auditData.setAuditTag(auditMessageBody.getAuditTag());
             auditData.setCount(auditMessageBody.getCount());
             auditData.setDelay(auditMessageBody.getDelay());
             auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index c4faf560cd..a89c30f6c0 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -46,6 +46,7 @@ public class AuditOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditOperator.class);
     private static final String FIELD_SEPARATORS = ":";
+    private static final String DEFAULT_AUDIT_TAG = "-1";
     private static final int BATCH_NUM = 100;
     private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
     private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
@@ -133,9 +134,14 @@ public class AuditOperator {
      * Add audit data
      */
     public void add(int auditID, String inlongGroupID, String inlongStreamID, 
Long logTime, long count, long size) {
+        add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, 
logTime, count, size);
+    }
+
+    public void add(int auditID, String auditTag, String inlongGroupID, String 
inlongStreamID, Long logTime,
+            long count, long size) {
         long delayTime = System.currentTimeMillis() - logTime;
         String key = (logTime / PERIOD) + FIELD_SEPARATORS + inlongGroupID + 
FIELD_SEPARATORS
-                + inlongStreamID + FIELD_SEPARATORS + auditID;
+                + inlongStreamID + FIELD_SEPARATORS + auditID + 
FIELD_SEPARATORS + auditTag;
         addByKey(key, count, size, delayTime);
     }
 
@@ -196,12 +202,14 @@ public class AuditOperator {
             String inlongGroupID = keyArray[1];
             String inlongStreamID = keyArray[2];
             String auditID = keyArray[3];
+            String auditTag = keyArray[4];
             StatInfo value = entry.getValue();
             AuditApi.AuditMessageBody msgBody = 
AuditApi.AuditMessageBody.newBuilder()
                     .setLogTs(logTime)
                     .setInlongGroupId(inlongGroupID)
                     .setInlongStreamId(inlongStreamID)
                     .setAuditId(auditID)
+                    .setAuditTag(auditTag)
                     .setCount(value.count.get())
                     .setSize(value.size.get())
                     .setDelay(value.delay.get())
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
index c6d5ebd133..1e4daa6e45 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
@@ -36,6 +36,7 @@ public class AuditDataPo {
     private String inlongGroupId;
     private String inlongStreamId;
     private String auditId;
+    private String auditTag;
     private Long count;
     private Long size;
     private Long delay;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
index 3280ca6b1b..9171b466e1 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
@@ -35,6 +35,7 @@ public class ClickHouseDataPo {
     private String inlongGroupId;
     private String inlongStreamId;
     private String auditId;
+    private String auditTag;
     private Long count;
     private Long size;
     private Long delay;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
index 9ec42eebcd..812fe51946 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
@@ -34,6 +34,7 @@ public class ESDataPo {
     private String inlongGroupId;
     private String inlongStreamId;
     private String auditId;
+    private String auditTag;
     private long count;
     private long size;
     private long delay;
@@ -41,7 +42,7 @@ public class ESDataPo {
 
     public String getDocId() {
         String docId = ip + dockerId + threadId + sdkTs + packetId + logTs + 
inlongGroupId + inlongStreamId
-                + auditId + count;
+                + auditId + auditTag + count;
         return docId;
     }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index fa6caf202e..7cad1326ab 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -45,10 +45,10 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseService.class);
     public static final String INSERT_SQL = "insert into audit_data (ip, 
docker_id, thread_id,\r\n"
             + "      sdk_ts, packet_id, log_ts,\r\n"
-            + "      inlong_group_id, inlong_stream_id, audit_id,\r\n"
+            + "      inlong_group_id, inlong_stream_id, 
audit_id,audit_tag,\r\n"
             + "      count, size, delay, \r\n"
             + "      update_time)\r\n"
-            + "    values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+            + "    values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
 
     private ClickHouseConfig chConfig;
 
@@ -110,10 +110,11 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
                 pstat.setString(7, data.getInlongGroupId());
                 pstat.setString(8, data.getInlongStreamId());
                 pstat.setString(9, data.getAuditId());
-                pstat.setLong(10, data.getCount());
-                pstat.setLong(11, data.getSize());
-                pstat.setLong(12, data.getDelay());
-                pstat.setTimestamp(13, data.getUpdateTime());
+                pstat.setString(10, data.getAuditTag());
+                pstat.setLong(11, data.getCount());
+                pstat.setLong(12, data.getSize());
+                pstat.setLong(13, data.getDelay());
+                pstat.setTimestamp(14, data.getUpdateTime());
                 pstat.addBatch();
                 this.batchCounter.decrementAndGet();
                 if (++counter >= chConfig.getBatchThreshold()) {
@@ -173,6 +174,7 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
         data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
         data.setLogTs(new Timestamp(msgBody.getLogTs()));
         data.setAuditId(msgBody.getAuditId());
+        data.setAuditTag(msgBody.getAuditTag());
         data.setCount(msgBody.getCount());
         data.setDelay(msgBody.getDelay());
         data.setInlongGroupId(msgBody.getInlongGroupId());
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
index ebd297334a..ec0c203fc9 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
@@ -234,6 +234,13 @@ public class ElasticsearchService implements InsertData, 
AutoCloseable {
                 }
                 builder.endObject();
             }
+            {
+                builder.startObject("audit_tag");
+                {
+                    builder.field("type", "keyword");
+                }
+                builder.endObject();
+            }
             {
                 builder.startObject("inlong_group_id");
                 {
@@ -330,6 +337,7 @@ public class ElasticsearchService implements InsertData, 
AutoCloseable {
         esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
         esPo.setLogTs(new Date(msgBody.getLogTs()));
         esPo.setAuditId(msgBody.getAuditId());
+        esPo.setAuditTag(msgBody.getAuditTag());
         esPo.setCount(msgBody.getCount());
         esPo.setDelay(msgBody.getDelay());
         esPo.setInlongGroupId(msgBody.getInlongGroupId());
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
index 812d005d5b..b0fc283abe 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
@@ -44,6 +44,7 @@ public class MySqlService implements InsertData {
         po.setSdkTs(new Date(msgBody.getSdkTs()));
         po.setLogTs(new Date(msgBody.getLogTs()));
         po.setAuditId(msgBody.getAuditId());
+        po.setAuditTag(msgBody.getAuditTag());
         po.setCount(msgBody.getCount());
         po.setDelay(msgBody.getDelay());
         po.setInlongGroupId(msgBody.getInlongGroupId());
diff --git a/inlong-audit/sql/apache_inlong_audit.sql 
b/inlong-audit/sql/apache_inlong_audit.sql
index 88784c49b1..2d10ca697c 100644
--- a/inlong-audit/sql/apache_inlong_audit.sql
+++ b/inlong-audit/sql/apache_inlong_audit.sql
@@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS `audit_data`
     `inlong_group_id`  varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong group id',
     `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong stream id',
     `audit_id`         varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
+    `audit_tag`        varchar(100) DEFAULT '' COMMENT 'Audit tag',
     `count`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
count',
     `size`             BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
size',
     `delay`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
delay count',
diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql 
b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
index 07045910d5..936305c73b 100644
--- a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
+++ b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
@@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS `audit_data`
     `inlong_group_id`  String COMMENT 'The target inlong group id',
     `inlong_stream_id` String COMMENT 'The target inlong stream id',
     `audit_id`         String COMMENT 'Audit id',
+    `audit_tag`        String COMMENT 'Audit tag',
     `count`            Int64 COMMENT 'Message count',
     `size`             Int64 COMMENT 'Message size',
     `delay`            Int64 COMMENT 'Message delay',
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
 b/inlong-audit/sql/clickhouse-changes-1.9.0.sql
similarity index 59%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
copy to inlong-audit/sql/clickhouse-changes-1.9.0.sql
index 3280ca6b1b..8bfe84d7fa 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ b/inlong-audit/sql/clickhouse-changes-1.9.0.sql
@@ -15,29 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.db.entities;
+-- This is the SQL change file from version 1.8.0 to the current version 1.9.0.
+-- When upgrading to version 1.9.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
 
-import lombok.Getter;
-import lombok.Setter;
+USE `apache_inlong_audit`;
 
-import java.sql.Timestamp;
+ALTER TABLE audit_data ADD COLUMN audit_tag String DEFAULT '' COMMENT 'Audit 
tag' after `audit_id`;
 
-@Getter
-@Setter
-public class ClickHouseDataPo {
 
-    private String ip;
-    private String dockerId;
-    private String threadId;
-    private Timestamp sdkTs;
-    private Long packetId;
-    private Timestamp logTs;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private String auditId;
-    private Long count;
-    private Long size;
-    private Long delay;
-    private Timestamp updateTime;
-
-}
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
 b/inlong-audit/sql/mysql-changes-1.9.0.sql
similarity index 59%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
copy to inlong-audit/sql/mysql-changes-1.9.0.sql
index 3280ca6b1b..29ade6c935 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ b/inlong-audit/sql/mysql-changes-1.9.0.sql
@@ -15,29 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.db.entities;
+-- This is the SQL change file from version 1.8.0 to the current version 1.9.0.
+-- When upgrading to version 1.9.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
 
-import lombok.Getter;
-import lombok.Setter;
+USE `apache_inlong_audit`;
 
-import java.sql.Timestamp;
+ALTER TABLE audit_data ADD COLUMN audit_tag varchar(100) DEFAULT '' COMMENT 
'Audit tag' after `audit_id`;
 
-@Getter
-@Setter
-public class ClickHouseDataPo {
 
-    private String ip;
-    private String dockerId;
-    private String threadId;
-    private Timestamp sdkTs;
-    private Long packetId;
-    private Timestamp logTs;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private String auditId;
-    private Long count;
-    private Long size;
-    private Long delay;
-    private Timestamp updateTime;
-
-}

Reply via email to