This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 26f53726a0 [INLONG-8836][Audit] Add audit_tag information to
distinguish data sources and data targets (#8837)
26f53726a0 is described below
commit 26f53726a06dd52e6e4bf1eba810bf295c5cc1f5
Author: doleyzi <[email protected]>
AuthorDate: Tue Sep 5 18:56:17 2023 +0800
[INLONG-8836][Audit] Add audit_tag information to distinguish data sources
and data targets (#8837)
---
.../apache/inlong/audit/protocol/AuditData.java | 8 +++++++
.../audit-common/src/main/proto/AuditApi.proto | 1 +
.../inlong/audit/source/ServerMessageHandler.java | 1 +
.../org/apache/inlong/audit/AuditOperator.java | 10 ++++++++-
.../inlong/audit/db/entities/AuditDataPo.java | 1 +
.../inlong/audit/db/entities/ClickHouseDataPo.java | 1 +
.../apache/inlong/audit/db/entities/ESDataPo.java | 3 ++-
.../inlong/audit/service/ClickHouseService.java | 14 +++++++-----
.../inlong/audit/service/ElasticsearchService.java | 8 +++++++
.../apache/inlong/audit/service/MySqlService.java | 1 +
inlong-audit/sql/apache_inlong_audit.sql | 1 +
.../sql/apache_inlong_audit_clickhouse.sql | 1 +
.../clickhouse-changes-1.9.0.sql} | 26 ++++------------------
.../mysql-changes-1.9.0.sql} | 26 ++++------------------
14 files changed, 50 insertions(+), 52 deletions(-)
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
index ac8d596984..eb64cd00f1 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
@@ -28,6 +28,7 @@ public class AuditData {
private String inlongGroupId;
private String inlongStreamId;
private String auditId;
+ private String auditTag;
private long count;
private long size;
private long delay;
@@ -104,6 +105,13 @@ public class AuditData {
this.auditId = auditId;
}
+ public String getAuditTag() {
+ return auditTag;
+ }
+
+ public void setAuditTag(String auditTag) {
+ this.auditTag = auditTag;
+ }
public long getCount() {
return count;
}
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto
b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
index ce89b112b3..44bf881017 100644
--- a/inlong-audit/audit-common/src/main/proto/AuditApi.proto
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -61,6 +61,7 @@ message AuditMessageBody {
uint64 count = 5;
uint64 size = 6;
int64 delay = 7;
+ string audit_tag = 8;
}
message AuditReply {
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 3dcc53f1f7..4ad53136e0 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -166,6 +166,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
auditData.setLogTs(auditMessageBody.getLogTs());
auditData.setAuditId(auditMessageBody.getAuditId());
+ auditData.setAuditTag(auditMessageBody.getAuditTag());
auditData.setCount(auditMessageBody.getCount());
auditData.setDelay(auditMessageBody.getDelay());
auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index c4faf560cd..a89c30f6c0 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -46,6 +46,7 @@ public class AuditOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(AuditOperator.class);
private static final String FIELD_SEPARATORS = ":";
+ private static final String DEFAULT_AUDIT_TAG = "-1";
private static final int BATCH_NUM = 100;
private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
@@ -133,9 +134,14 @@ public class AuditOperator {
* Add audit data
*/
public void add(int auditID, String inlongGroupID, String inlongStreamID,
Long logTime, long count, long size) {
+ add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID,
logTime, count, size);
+ }
+
+ public void add(int auditID, String auditTag, String inlongGroupID, String
inlongStreamID, Long logTime,
+ long count, long size) {
long delayTime = System.currentTimeMillis() - logTime;
String key = (logTime / PERIOD) + FIELD_SEPARATORS + inlongGroupID +
FIELD_SEPARATORS
- + inlongStreamID + FIELD_SEPARATORS + auditID;
+ + inlongStreamID + FIELD_SEPARATORS + auditID +
FIELD_SEPARATORS + auditTag;
addByKey(key, count, size, delayTime);
}
@@ -196,12 +202,14 @@ public class AuditOperator {
String inlongGroupID = keyArray[1];
String inlongStreamID = keyArray[2];
String auditID = keyArray[3];
+ String auditTag = keyArray[4];
StatInfo value = entry.getValue();
AuditApi.AuditMessageBody msgBody =
AuditApi.AuditMessageBody.newBuilder()
.setLogTs(logTime)
.setInlongGroupId(inlongGroupID)
.setInlongStreamId(inlongStreamID)
.setAuditId(auditID)
+ .setAuditTag(auditTag)
.setCount(value.count.get())
.setSize(value.size.get())
.setDelay(value.delay.get())
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
index c6d5ebd133..1e4daa6e45 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
@@ -36,6 +36,7 @@ public class AuditDataPo {
private String inlongGroupId;
private String inlongStreamId;
private String auditId;
+ private String auditTag;
private Long count;
private Long size;
private Long delay;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
index 3280ca6b1b..9171b466e1 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
@@ -35,6 +35,7 @@ public class ClickHouseDataPo {
private String inlongGroupId;
private String inlongStreamId;
private String auditId;
+ private String auditTag;
private Long count;
private Long size;
private Long delay;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
index 9ec42eebcd..812fe51946 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
@@ -34,6 +34,7 @@ public class ESDataPo {
private String inlongGroupId;
private String inlongStreamId;
private String auditId;
+ private String auditTag;
private long count;
private long size;
private long delay;
@@ -41,7 +42,7 @@ public class ESDataPo {
public String getDocId() {
String docId = ip + dockerId + threadId + sdkTs + packetId + logTs +
inlongGroupId + inlongStreamId
- + auditId + count;
+ + auditId + auditTag + count;
return docId;
}
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index fa6caf202e..7cad1326ab 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -45,10 +45,10 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(ClickHouseService.class);
public static final String INSERT_SQL = "insert into audit_data (ip,
docker_id, thread_id,\r\n"
+ " sdk_ts, packet_id, log_ts,\r\n"
- + " inlong_group_id, inlong_stream_id, audit_id,\r\n"
+ + " inlong_group_id, inlong_stream_id,
audit_id,audit_tag,\r\n"
+ " count, size, delay, \r\n"
+ " update_time)\r\n"
- + " values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
private ClickHouseConfig chConfig;
@@ -110,10 +110,11 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
pstat.setString(7, data.getInlongGroupId());
pstat.setString(8, data.getInlongStreamId());
pstat.setString(9, data.getAuditId());
- pstat.setLong(10, data.getCount());
- pstat.setLong(11, data.getSize());
- pstat.setLong(12, data.getDelay());
- pstat.setTimestamp(13, data.getUpdateTime());
+ pstat.setString(10, data.getAuditTag());
+ pstat.setLong(11, data.getCount());
+ pstat.setLong(12, data.getSize());
+ pstat.setLong(13, data.getDelay());
+ pstat.setTimestamp(14, data.getUpdateTime());
pstat.addBatch();
this.batchCounter.decrementAndGet();
if (++counter >= chConfig.getBatchThreshold()) {
@@ -173,6 +174,7 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
data.setLogTs(new Timestamp(msgBody.getLogTs()));
data.setAuditId(msgBody.getAuditId());
+ data.setAuditTag(msgBody.getAuditTag());
data.setCount(msgBody.getCount());
data.setDelay(msgBody.getDelay());
data.setInlongGroupId(msgBody.getInlongGroupId());
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
index ebd297334a..ec0c203fc9 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
@@ -234,6 +234,13 @@ public class ElasticsearchService implements InsertData,
AutoCloseable {
}
builder.endObject();
}
+ {
+ builder.startObject("audit_tag");
+ {
+ builder.field("type", "keyword");
+ }
+ builder.endObject();
+ }
{
builder.startObject("inlong_group_id");
{
@@ -330,6 +337,7 @@ public class ElasticsearchService implements InsertData,
AutoCloseable {
esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
esPo.setLogTs(new Date(msgBody.getLogTs()));
esPo.setAuditId(msgBody.getAuditId());
+ esPo.setAuditTag(msgBody.getAuditTag());
esPo.setCount(msgBody.getCount());
esPo.setDelay(msgBody.getDelay());
esPo.setInlongGroupId(msgBody.getInlongGroupId());
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
index 812d005d5b..b0fc283abe 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
@@ -44,6 +44,7 @@ public class MySqlService implements InsertData {
po.setSdkTs(new Date(msgBody.getSdkTs()));
po.setLogTs(new Date(msgBody.getLogTs()));
po.setAuditId(msgBody.getAuditId());
+ po.setAuditTag(msgBody.getAuditTag());
po.setCount(msgBody.getCount());
po.setDelay(msgBody.getDelay());
po.setInlongGroupId(msgBody.getInlongGroupId());
diff --git a/inlong-audit/sql/apache_inlong_audit.sql
b/inlong-audit/sql/apache_inlong_audit.sql
index 88784c49b1..2d10ca697c 100644
--- a/inlong-audit/sql/apache_inlong_audit.sql
+++ b/inlong-audit/sql/apache_inlong_audit.sql
@@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS `audit_data`
`inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong group id',
`inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong stream id',
`audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
+ `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag',
`count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
count',
`size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
size',
`delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
delay count',
diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
index 07045910d5..936305c73b 100644
--- a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
+++ b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
@@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS `audit_data`
`inlong_group_id` String COMMENT 'The target inlong group id',
`inlong_stream_id` String COMMENT 'The target inlong stream id',
`audit_id` String COMMENT 'Audit id',
+ `audit_tag` String COMMENT 'Audit tag',
`count` Int64 COMMENT 'Message count',
`size` Int64 COMMENT 'Message size',
`delay` Int64 COMMENT 'Message delay',
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
b/inlong-audit/sql/clickhouse-changes-1.9.0.sql
similarity index 59%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
copy to inlong-audit/sql/clickhouse-changes-1.9.0.sql
index 3280ca6b1b..8bfe84d7fa 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ b/inlong-audit/sql/clickhouse-changes-1.9.0.sql
@@ -15,29 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.db.entities;
+-- This is the SQL change file from version 1.8.0 to the current version 1.9.0.
+-- When upgrading to version 1.9.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
-import lombok.Getter;
-import lombok.Setter;
+USE `apache_inlong_audit`;
-import java.sql.Timestamp;
+ALTER TABLE audit_data ADD COLUMN audit_tag String DEFAULT '' COMMENT 'Audit
tag' after `audit_id`;
-@Getter
-@Setter
-public class ClickHouseDataPo {
- private String ip;
- private String dockerId;
- private String threadId;
- private Timestamp sdkTs;
- private Long packetId;
- private Timestamp logTs;
- private String inlongGroupId;
- private String inlongStreamId;
- private String auditId;
- private Long count;
- private Long size;
- private Long delay;
- private Timestamp updateTime;
-
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
b/inlong-audit/sql/mysql-changes-1.9.0.sql
similarity index 59%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
copy to inlong-audit/sql/mysql-changes-1.9.0.sql
index 3280ca6b1b..29ade6c935 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ b/inlong-audit/sql/mysql-changes-1.9.0.sql
@@ -15,29 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.db.entities;
+-- This is the SQL change file from version 1.8.0 to the current version 1.9.0.
+-- When upgrading to version 1.9.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
-import lombok.Getter;
-import lombok.Setter;
+USE `apache_inlong_audit`;
-import java.sql.Timestamp;
+ALTER TABLE audit_data ADD COLUMN audit_tag varchar(100) DEFAULT '' COMMENT
'Audit tag' after `audit_id`;
-@Getter
-@Setter
-public class ClickHouseDataPo {
- private String ip;
- private String dockerId;
- private String threadId;
- private Timestamp sdkTs;
- private Long packetId;
- private Timestamp logTs;
- private String inlongGroupId;
- private String inlongStreamId;
- private String auditId;
- private Long count;
- private Long size;
- private Long delay;
- private Timestamp updateTime;
-
-}