This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 23728055c3 [INLONG-10242][Audit] Audit SDK compatible with InLong
Manager to manage audit items (#10248)
23728055c3 is described below
commit 23728055c393cf30399a0f7e5b68490f419c56cc
Author: doleyzi <[email protected]>
AuthorDate: Wed May 22 12:47:36 2024 +0800
[INLONG-10242][Audit] Audit SDK compatible with InLong Manager to manage
audit items (#10248)
---
.../java/org/apache/inlong/audit/AuditIdEnum.java | 116 +++++++++++++------
.../org/apache/inlong/audit/AuditReporterImpl.java | 24 +++-
.../inlong/audit/entity/AuditInformation.java | 34 ++++++
.../org/apache/inlong/audit/entity/AuditType.java | 47 ++++++++
.../org/apache/inlong/audit/entity/FlowType.java | 39 +++++++
.../exceptions/AuditTypeNotExistException.java | 25 ++++
.../inlong/audit/util/AuditManagerUtils.java | 95 +++++++++++++++-
.../inlong/audit/util/AuditManagerUtilsTest.java | 126 +++++++++++++++++++++
8 files changed, 471 insertions(+), 35 deletions(-)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditIdEnum.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditIdEnum.java
index 51c9d2a1ab..d63bc3a17a 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditIdEnum.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditIdEnum.java
@@ -17,65 +17,96 @@
package org.apache.inlong.audit;
+import org.apache.inlong.audit.entity.AuditType;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.audit.exceptions.AuditTypeNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.inlong.audit.entity.AuditType.AGENT;
+import static org.apache.inlong.audit.entity.AuditType.BINLOG;
+import static org.apache.inlong.audit.entity.AuditType.CLICKHOUSE;
+import static org.apache.inlong.audit.entity.AuditType.DATAPROXY;
+import static org.apache.inlong.audit.entity.AuditType.DORIS;
+import static org.apache.inlong.audit.entity.AuditType.ELASTICSEARCH;
+import static org.apache.inlong.audit.entity.AuditType.HBASE;
+import static org.apache.inlong.audit.entity.AuditType.HIVE;
+import static org.apache.inlong.audit.entity.AuditType.HUDI;
+import static org.apache.inlong.audit.entity.AuditType.ICEBERG;
+import static org.apache.inlong.audit.entity.AuditType.KUDU;
+import static org.apache.inlong.audit.entity.AuditType.MYSQL;
+import static org.apache.inlong.audit.entity.AuditType.POSTGRES;
+import static org.apache.inlong.audit.entity.AuditType.SDK;
+import static org.apache.inlong.audit.entity.AuditType.STARROCKS;
+import static org.apache.inlong.audit.entity.AuditType.TUBE;
+import static org.apache.inlong.audit.entity.FlowType.INPUT;
+import static org.apache.inlong.audit.entity.FlowType.OUTPUT;
+
/**
* Audit item management, each module is assigned two baseline audit item IDs,
namely receiving and sending.
*/
public enum AuditIdEnum {
- SDK_INPUT(1, "Received Audit Metrics for SDK"),
- SDK_OUTPUT(2, "Sent Audit Metrics for SDK"),
+ SDK_INPUT(1, INPUT, SDK, "Received Audit Metrics for SDK"),
+ SDK_OUTPUT(2, OUTPUT, SDK, "Sent Audit Metrics for SDK"),
- AGENT_INPUT(3, "Received Audit Metrics for Agent"),
- AGENT_OUTPUT(4, "Sent Audit Metrics for Agent"),
+ AGENT_INPUT(3, INPUT, AGENT, "Received Audit Metrics for Agent"),
+ AGENT_OUTPUT(4, OUTPUT, AGENT, "Sent Audit Metrics for Agent"),
- DATA_PROXY_INPUT(5, "Received Audit Metrics for DataProxy"),
- DATA_PROXY_OUTPUT(6, "Sent Audit Metrics for DataProxy"),
+ DATA_PROXY_INPUT(5, INPUT, DATAPROXY, "Received Audit Metrics for
DataProxy"),
+ DATA_PROXY_OUTPUT(6, OUTPUT, DATAPROXY, "Sent Audit Metrics for
DataProxy"),
- SORT_HIVE_INPUT(7, "Received Audit Metrics for Sort Hive"),
- SORT_HIVE_OUTPUT(8, "Sent Audit Metrics for Sort Hive"),
+ SORT_HIVE_INPUT(7, INPUT, HIVE, "Received Audit Metrics for Sort Hive"),
+ SORT_HIVE_OUTPUT(8, OUTPUT, HIVE, "Sent Audit Metrics for Sort Hive"),
- SORT_CLICKHOUSE_INPUT(9, "Received Audit Metrics for Sort ClickHouse"),
- SORT_CLICKHOUSE_OUTPUT(10, "Sent Audit Metrics for Sort ClickHouse"),
+ SORT_CLICKHOUSE_INPUT(9, INPUT, CLICKHOUSE, "Received Audit Metrics for
Sort ClickHouse"),
+ SORT_CLICKHOUSE_OUTPUT(10, OUTPUT, CLICKHOUSE, "Sent Audit Metrics for
Sort ClickHouse"),
- SORT_ELASTICSEARCH_INPUT(11, "Received Audit Metrics for Sort
ElasticSearch"),
- SORT_ELASTICSEARCH_OUTPUT(12, "Sent Audit Metrics for Sort ElasticSearch"),
+ SORT_ELASTICSEARCH_INPUT(11, INPUT, ELASTICSEARCH, "Received Audit Metrics
for Sort ElasticSearch"),
+ SORT_ELASTICSEARCH_OUTPUT(12, OUTPUT, ELASTICSEARCH, "Sent Audit Metrics
for Sort ElasticSearch"),
- SORT_STARROCKS_INPUT(13, "Received Audit Metrics for Sort StarRocks"),
- SORT_STARROCKS_OUTPUT(14, "Sent Audit Metrics for Sort StarRocks"),
+ SORT_STARROCKS_INPUT(13, INPUT, STARROCKS, "Received Audit Metrics for
Sort StarRocks"),
+ SORT_STARROCKS_OUTPUT(14, OUTPUT, STARROCKS, "Sent Audit Metrics for Sort
StarRocks"),
- SORT_HUDI_INPUT(15, "Received Audit Metrics for Sort HuDi"),
- SORT_HUDI_OUTPUT(16, "Sent Audit Metrics for Sort HuDi"),
+ SORT_HUDI_INPUT(15, INPUT, HUDI, "Received Audit Metrics for Sort HuDi"),
+ SORT_HUDI_OUTPUT(16, OUTPUT, HUDI, "Sent Audit Metrics for Sort HuDi"),
- SORT_ICEBERG_INPUT(17, "Received Audit Metrics for Sort Iceberg"),
- SORT_ICEBERG_OUTPUT(18, "Sent Audit Metrics for Sort Iceberg"),
+ SORT_ICEBERG_INPUT(17, INPUT, ICEBERG, "Received Audit Metrics for Sort
Iceberg"),
+ SORT_ICEBERG_OUTPUT(18, OUTPUT, ICEBERG, "Sent Audit Metrics for Sort
Iceberg"),
- SORT_HBASE_INPUT(19, "Received Audit Metrics for Sort HBase"),
- SORT_HBASE_OUTPUT(20, "Sent Audit Metrics for Sort HBase"),
+ SORT_HBASE_INPUT(19, INPUT, HBASE, "Received Audit Metrics for Sort
HBase"),
+ SORT_HBASE_OUTPUT(20, OUTPUT, HBASE, "Sent Audit Metrics for Sort HBase"),
- SORT_DORIS_INPUT(21, "Received Audit Metrics for Sort Doris"),
- SORT_DORIS_OUTPUT(22, "Sent Audit Metrics for Sort Doris"),
+ SORT_DORIS_INPUT(21, INPUT, DORIS, "Received Audit Metrics for Sort
Doris"),
+ SORT_DORIS_OUTPUT(22, OUTPUT, DORIS, "Sent Audit Metrics for Sort Doris"),
- SORT_KUDU_INPUT(25, "Received Audit Metrics for Sort Kudu"),
- SORT_KUDU_OUTPUT(26, "Sent Audit Metrics for Sort Kudu"),
+ SORT_KUDU_INPUT(25, INPUT, KUDU, "Received Audit Metrics for Sort Kudu"),
+ SORT_KUDU_OUTPUT(26, OUTPUT, KUDU, "Sent Audit Metrics for Sort Kudu"),
- SORT_POSTGRES_INPUT(27, "Received Audit Metrics for Sort Postgres"),
- SORT_POSTGRES_OUTPUT(28, "Sent Audit Metrics for Sort Postgres"),
+ SORT_POSTGRES_INPUT(27, INPUT, POSTGRES, "Received Audit Metrics for Sort
Postgres"),
+ SORT_POSTGRES_OUTPUT(28, OUTPUT, POSTGRES, "Sent Audit Metrics for Sort
Postgres"),
- SORT_BINLOG_INPUT(29, "Received Audit Metrics for Sort Binlog"),
- SORT_BINLOG_OUTPUT(30, "Sent Audit Metrics for Sort Binlog"),
+ SORT_BINLOG_INPUT(29, INPUT, BINLOG, "Received Audit Metrics for Sort
Binlog"),
+ SORT_BINLOG_OUTPUT(30, OUTPUT, BINLOG, "Sent Audit Metrics for Sort
Binlog"),
- SORT_TUBE_INPUT(33, "Received Audit Metrics for Sort Tube"),
- SORT_TUBE_OUTPUT(34, "Sent Audit Metrics for Sort Tube"),
+ SORT_TUBE_INPUT(33, INPUT, TUBE, "Received Audit Metrics for Sort Tube"),
+ SORT_TUBE_OUTPUT(34, OUTPUT, TUBE, "Sent Audit Metrics for Sort Tube"),
- SORT_MYSQL_INPUT(35, "Received Audit Metrics for Sort MySQL"),
- SORT_MYSQL_OUTPUT(36, "Sent Audit Metrics for Sort MySQL");
+ SORT_MYSQL_INPUT(35, INPUT, MYSQL, "Received Audit Metrics for Sort
MySQL"),
+ SORT_MYSQL_OUTPUT(36, OUTPUT, MYSQL, "Sent Audit Metrics for Sort MySQL");
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditIdEnum.class);
private final int auditId;
private final String description;
+ private final FlowType flowType;
+ private final AuditType auditType;
- AuditIdEnum(int auditId, String description) {
+ AuditIdEnum(int auditId, FlowType flowType, AuditType auditType, String
description) {
this.auditId = auditId;
this.description = description;
+ this.flowType = flowType;
+ this.auditType = auditType;
}
public int getValue() {
@@ -85,4 +116,23 @@ public enum AuditIdEnum {
public String getDescription() {
return description;
}
+
+ public FlowType getFlowType() {
+ return flowType;
+ }
+
+ public AuditType getAuditType() {
+ return auditType;
+ }
+
+ public static AuditIdEnum getAuditId(String auditType, FlowType flowType) {
+ for (AuditIdEnum auditIdEnum : AuditIdEnum.values()) {
+ if (auditIdEnum.getFlowType() == flowType &&
+
auditType.equalsIgnoreCase(auditIdEnum.getAuditType().value())) {
+ return auditIdEnum;
+ }
+ }
+ LOGGER.error("Error Audit type: {}, flow type {}: ", auditType,
flowType);
+ throw new AuditTypeNotExistException(String.format("Audit type %s does
not exist", auditType));
+ }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index eafa82d5b3..1c8672748d 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -17,6 +17,8 @@
package org.apache.inlong.audit;
+import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.FlowType;
import org.apache.inlong.audit.loader.SocketAddressListLoader;
import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.send.SenderManager;
@@ -503,7 +505,7 @@ public class AuditReporterImpl implements Serializable {
}
/**
- * Generate Audit item IDs.
+ * Generate Audit item IDs.
*
* @param baseAuditId
* @param success
@@ -519,4 +521,24 @@ public class AuditReporterImpl implements Serializable {
boolean retry) {
return AuditManagerUtils.buildAuditId(baseAuditId, success,
isRealtime, discard, retry);
}
+
+ public AuditInformation buildAuditInformation(String auditType,
+ FlowType dataFlow,
+ boolean success,
+ boolean isRealtime,
+ boolean discard,
+ boolean retry) {
+ return AuditManagerUtils.buildAuditInformation(auditType, dataFlow,
success, isRealtime, discard, retry);
+ }
+ public List<AuditInformation> getAllAuditInformation() {
+ return AuditManagerUtils.getAllAuditInformation();
+ }
+
+ public List<AuditInformation> getAllAuditInformation(String auditType) {
+ return AuditManagerUtils.getAllAuditInformation(auditType);
+ }
+
+ public int getStartAuditIdForMetric() {
+ return AuditManagerUtils.getStartAuditIdForMetric();
+ }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditInformation.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditInformation.java
new file mode 100644
index 0000000000..2be1662c9f
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditInformation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * The Audit information description currently only supports Chinese and
English.
+ * If other languages need to be added, it can be expanded.
+ */
+@Data
+@AllArgsConstructor
+public class AuditInformation {
+
+ private final int auditId;
+ private final String nameInEnglish;
+ private final String nameInChinese;
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
new file mode 100644
index 0000000000..6fb554b407
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.entity;
+
+public enum AuditType {
+
+ SDK("SDK"),
+ AGENT("Agent"),
+ DATAPROXY("DataProxy"),
+ HIVE("Hive"),
+ CLICKHOUSE("ClickHouse"),
+ ELASTICSEARCH("ElasticSearch"),
+ STARROCKS("StarRocks"),
+ HUDI("HuDi"),
+ ICEBERG("Iceberg"),
+ HBASE("HBase"),
+ DORIS("Doris"),
+ KUDU("Kudu"),
+ POSTGRES("Postgres"),
+ BINLOG("Binlog"),
+ TUBE("Tube"),
+ MYSQL("MySQL");
+
+ private final String auditType;
+
+ AuditType(String auditType) {
+ this.auditType = auditType;
+ }
+ public String value() {
+ return auditType;
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/FlowType.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/FlowType.java
new file mode 100644
index 0000000000..aaf9883ae1
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/FlowType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.entity;
+
+public enum FlowType {
+
+ INPUT("received", " 接收"),
+ OUTPUT("send", " 发送");
+ private final String nameInEnglish;
+ private final String nameInChinese;
+
+ FlowType(String nameInEnglish, String nameInChinese) {
+ this.nameInEnglish = nameInEnglish;
+ this.nameInChinese = nameInChinese;
+ }
+
+ public String getNameInEnglish() {
+ return nameInEnglish;
+ }
+
+ public String getNameInChinese() {
+ return nameInChinese;
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/exceptions/AuditTypeNotExistException.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/exceptions/AuditTypeNotExistException.java
new file mode 100644
index 0000000000..431b5332b5
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/exceptions/AuditTypeNotExistException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.exceptions;
+
+public class AuditTypeNotExistException extends RuntimeException {
+
+ public AuditTypeNotExistException(String message) {
+ super(message);
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
index b36ac9f1dd..8bc2c8e7a3 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
@@ -18,6 +18,14 @@
package org.apache.inlong.audit.util;
import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.FlowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
/**
* Audit item ID generation rules: composed of basic audit item ID + extension
bits.
@@ -27,8 +35,10 @@ import org.apache.inlong.audit.AuditIdEnum;
*/
public class AuditManagerUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditManagerUtils.class);
public static final int AUDIT_SUFFIX_LENGTH = 16;
public static final int AUDIT_MAX_PREFIX_LENGTH = 14;
+ public static final String AUDIT_DESCRIPTION_JOINER = "_";
private static String buildSuccessAndFailureFlag(boolean success) {
return success ? "0" : "1";
@@ -55,7 +65,7 @@ public class AuditManagerUtils {
}
/**
- * Generate Audit item IDs.
+ * Generate Audit item IDs.
*
* @param baseAuditId
* @param success
@@ -76,6 +86,89 @@ public class AuditManagerUtils {
return Integer.parseInt(auditPreFix +
buildAuditIdSuffix(baseAuditId.getValue()), 2);
}
+ public static AuditInformation buildAuditInformation(String auditType,
+ FlowType flowType,
+ boolean success,
+ boolean isRealtime,
+ boolean discard,
+ boolean retry) {
+ String auditPreFix = buildSuccessAndFailureFlag(success) +
+ buildRealtimeFlag(isRealtime) +
+ buildDiscardFlag(discard) +
+ buildRetryFlag(retry);
+ AuditIdEnum baseAuditId = AuditIdEnum.getAuditId(auditType, flowType);
+ int auditId = Integer.parseInt(auditPreFix +
buildAuditIdSuffix(baseAuditId.getValue()), 2);
+ StringBuilder nameInEnglish = new StringBuilder()
+ .append(baseAuditId.getAuditType().value())
+ .append(AUDIT_DESCRIPTION_JOINER)
+ .append(flowType.getNameInEnglish())
+ .append(AUDIT_DESCRIPTION_JOINER);
+ StringBuilder nameInChinese = new StringBuilder()
+ .append(baseAuditId.getAuditType().value())
+ .append(flowType.getNameInChinese());
+
+ if (discard) {
+ nameInEnglish.append("discard").append(AUDIT_DESCRIPTION_JOINER);
+ nameInChinese.append("丢弃");
+ }
+ if (retry) {
+ nameInEnglish.append("retry").append(AUDIT_DESCRIPTION_JOINER);
+ nameInChinese.append("重试");
+ }
+ if (success) {
+ nameInEnglish.append("success");
+ nameInChinese.append("成功");
+ } else {
+ nameInEnglish.append("failed");
+ nameInChinese.append("失败");
+ }
+ if (!isRealtime) {
+ nameInEnglish.append("(CheckPoint)");
+ nameInChinese.append("(CheckPoint)");
+ }
+ return new AuditInformation(auditId, nameInEnglish.toString(),
nameInChinese.toString());
+ }
+
+ public static List<AuditInformation> getAllAuditInformation() {
+ List<AuditInformation> auditInformationList = new LinkedList<>();
+ for (AuditIdEnum auditIdEnum : AuditIdEnum.values()) {
+
auditInformationList.addAll(combineAuditInformation(auditIdEnum.getAuditType().value(),
+ auditIdEnum.getFlowType()));
+ }
+ return auditInformationList;
+ }
+
+ public static List<AuditInformation> getAllAuditInformation(String
auditType) {
+ List<AuditInformation> auditInformationList = new LinkedList<>();
+ for (AuditIdEnum auditIdEnum : AuditIdEnum.values()) {
+ if (!auditIdEnum.getAuditType().value().equals(auditType)) {
+ continue;
+ }
+
auditInformationList.addAll(combineAuditInformation(auditIdEnum.getAuditType().value(),
+ auditIdEnum.getFlowType()));
+ }
+ return auditInformationList;
+ }
+
+ private static List<AuditInformation> combineAuditInformation(String
auditType, FlowType flowType) {
+ List<AuditInformation> auditInformationList = new LinkedList<>();
+ boolean[] combinations = {true, false};
+ for (boolean success : combinations) {
+ for (boolean isRealtime : combinations) {
+ for (boolean discard : combinations) {
+ for (boolean retry : combinations) {
+ if (discard && retry) {
+ continue;
+ }
+ auditInformationList
+ .add(buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry));
+ }
+ }
+ }
+ }
+ return auditInformationList;
+ }
+
/**
* Get max Audit ID.
*
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
new file mode 100644
index 0000000000..0268826656
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.FlowType;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.inlong.audit.AuditIdEnum.AGENT_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SORT_HIVE_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SORT_STARROCKS_INPUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AuditManagerUtilsTest {
+
+ @Test
+ public void buildAuditInformation() {
+ String auditType = "Agent";
+ FlowType flowType = FlowType.INPUT;
+ boolean success = true;
+ boolean isRealtime = true;
+ boolean discard = false;
+ boolean retry = false;
+ AuditInformation auditInfo =
+ AuditManagerUtils.buildAuditInformation(auditType, flowType,
success, isRealtime, discard, retry);
+ assertEquals(AGENT_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("Agent 接收成功", auditInfo.getNameInChinese());
+
+ auditType = "DataProxy";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(DATA_PROXY_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("DataProxy 接收成功", auditInfo.getNameInChinese());
+
+ auditType = "Hive";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(SORT_HIVE_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("Hive 接收成功", auditInfo.getNameInChinese());
+
+ auditType = "StarRocks";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(SORT_STARROCKS_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("StarRocks 接收成功", auditInfo.getNameInChinese());
+
+ // Test the scenario of dataFlow case compatibility.
+ auditType = "agent";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(AGENT_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("Agent 接收成功", auditInfo.getNameInChinese());
+
+ auditType = "dataProxy";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(DATA_PROXY_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("DataProxy 接收成功", auditInfo.getNameInChinese());
+
+ auditType = "hive";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(SORT_HIVE_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("Hive 接收成功", auditInfo.getNameInChinese());
+
+ auditType = "STARROCKS";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals(SORT_STARROCKS_INPUT.getValue(), auditInfo.getAuditId());
+ assertEquals("StarRocks 接收成功", auditInfo.getNameInChinese());
+
+ // Test send failed audit items.
+ auditType = "Agent";
+ flowType = FlowType.OUTPUT;
+ success = false;
+ isRealtime = true;
+ discard = false;
+ retry = false;
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals("Agent 发送失败", auditInfo.getNameInChinese());
+
+ auditType = "DataProxy";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals("DataProxy 发送失败", auditInfo.getNameInChinese());
+
+ auditType = "Hive";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals("Hive 发送失败", auditInfo.getNameInChinese());
+
+ auditType = "StarRocks";
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
+ assertEquals("StarRocks 发送失败", auditInfo.getNameInChinese());
+ }
+
+ @Test
+ public void getAllAuditInformation() {
+ List<AuditInformation> auditInfoList =
AuditManagerUtils.getAllAuditInformation();
+ System.out.println(auditInfoList);
+ assertTrue(auditInfoList.size() > 0);
+
+ auditInfoList.clear();
+ auditInfoList = AuditManagerUtils.getAllAuditInformation("Agent");
+
+ assertTrue(auditInfoList.size() > 0);
+ }
+
+ @Test
+ public void getStartAuditIdForMetric() {
+ int auditId = AuditManagerUtils.getStartAuditIdForMetric();
+ assertTrue(auditId > 0);
+ assertTrue(auditId <= (1 << 30));
+ }
+}