This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9411dc1a18 [INLONG-11999][Audit] Add alert evaluation and periodic
audit check task (#12002)
9411dc1a18 is described below
commit 9411dc1a1805825762a940e57ce8c1630a9a4ad4
Author: Kafka <[email protected]>
AuthorDate: Mon Sep 15 10:08:28 2025 +0800
[INLONG-11999][Audit] Add alert evaluation and periodic audit check task
(#12002)
* [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
* [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
* [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
* [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
* [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
* Update AuditToolMain.java
---------
Co-authored-by: doleyzi <[email protected]>
---
.../apache/inlong/audit/tool/AuditToolMain.java | 41 +++-
.../audit/tool/evaluator/AlertEvaluator.java | 112 +++++++++
.../audit/tool/reporter/OpenTelemetryReporter.java | 77 ++++++
.../inlong/audit/tool/task/AuditCheckTask.java | 146 +++++++++++
.../inlong/tool/evaluator/AlertEvaluatorTest.java | 266 +++++++++++++++++++++
.../tool/service/AuditMetricServiceTest.java | 47 ++--
.../inlong/tool/task/AuditCheckTaskTest.java | 139 +++++++++++
7 files changed, 796 insertions(+), 32 deletions(-)
diff --git
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
index 6508923127..a4c569a368 100644
---
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
+++
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
@@ -17,9 +17,48 @@
package org.apache.inlong.audit.tool;
+import org.apache.inlong.audit.tool.config.AppConfig;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.reporter.PrometheusReporter;
+import org.apache.inlong.audit.tool.task.AuditCheckTask;
+import org.apache.inlong.audit.tool.util.AuditSQLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class AuditToolMain {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditToolMain.class);
+
public static void main(String[] args) {
+ // Load application configuration
+ AppConfig appConfig = new AppConfig();
+
+ // Initialize auditAlertRule Manager
+ AuditAlertRuleManager auditAlertRuleManager =
AuditAlertRuleManager.getInstance();
+ auditAlertRuleManager.init(appConfig);
+ auditAlertRuleManager.schedule();
+
+ // Initialize reporters
+ PrometheusReporter prometheusReporter = new PrometheusReporter();
+ prometheusReporter.init(appConfig.getPrometheusConfig());
+
+ // Database query initialization
+ AuditSQLUtil.initialize(appConfig.getProperties());
+
+ // Initialize alert evaluator
+ AlertEvaluator alertEvaluator = new AlertEvaluator(prometheusReporter,
auditAlertRuleManager);
+ AuditCheckTask auditCheckTask =
+ new AuditCheckTask(auditAlertRuleManager, alertEvaluator,
appConfig);
+ auditCheckTask.start();
+
+ // Keep the application running
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ auditCheckTask.stop();
+ LOGGER.error("Audit Tool stopped.");
+ }));
+ LOGGER.info("Audit Tool started.");
}
-}
\ No newline at end of file
+}
diff --git
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/evaluator/AlertEvaluator.java
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/evaluator/AlertEvaluator.java
new file mode 100644
index 0000000000..13f1880837
--- /dev/null
+++
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/evaluator/AlertEvaluator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.tool.evaluator;
+
+import org.apache.inlong.audit.tool.dto.AuditAlertCondition;
+import org.apache.inlong.audit.tool.dto.AuditAlertRule;
+import org.apache.inlong.audit.tool.entity.AuditMetric;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.reporter.PrometheusReporter;
+
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+
+public class AlertEvaluator {
+
+ private final PrometheusReporter prometheusReporter;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AlertEvaluator.class);
+ @Getter
+ private final AuditAlertRuleManager auditAlertRuleManager;
+
+ public AlertEvaluator(PrometheusReporter prometheusReporter,
+ AuditAlertRuleManager auditAlertRuleManager) {
+ this.prometheusReporter = prometheusReporter;
+ this.auditAlertRuleManager = auditAlertRuleManager;
+ }
+
+ public void evaluateAndReportAlert(List<AuditMetric> sourceMetrics,
+ List<AuditMetric> sinkMetrics,
+ AuditAlertRule alertRule) {
+ if (sourceMetrics == null || sinkMetrics == null) {
+ return;
+ }
+
+ AuditAlertCondition condition = alertRule.getCondition();
+ double threshold = condition.getValue();
+ String op = condition.getOperator();
+
+ for (AuditMetric source : sourceMetrics) {
+ if (!Objects.equals(source.getInlongGroupId(),
alertRule.getInlongGroupId()) ||
+ !Objects.equals(source.getInlongStreamId(),
alertRule.getInlongStreamId())) {
+ continue;
+ }
+ for (AuditMetric sink : sinkMetrics) {
+ if (!Objects.equals(source.getInlongGroupId(),
sink.getInlongGroupId()) ||
+ !Objects.equals(source.getInlongStreamId(),
sink.getInlongStreamId())) {
+ continue;
+ }
+
+ if (source.getCount() == 0) {
+ continue;
+ }
+
+ double diff = (sink.getCount() - source.getCount()) / (double)
source.getCount();
+
+ boolean hit;
+
+ switch (op) {
+ case ">":
+ hit = diff > threshold;
+ break;
+ case ">=":
+ hit = diff >= threshold;
+ break;
+ case "<":
+ hit = diff < threshold;
+ break;
+ case "<=":
+ hit = diff <= threshold;
+ break;
+ case "==":
+ hit = diff == threshold;
+ break;
+ case "!=":
+ hit = diff != threshold;
+ break;
+ default:
+ hit = false;
+ }
+
+ if (hit) {
+ LOGGER.error(
+ "[ALERT] groupId={}, streamId={} | sourceCount={},
sinkCount={} | diff={} operator={} threshold={}",
+ source.getInlongGroupId(),
source.getInlongStreamId(),
+ source.getCount(), sink.getCount(), diff, op,
threshold);
+ if (prometheusReporter.getAuditMetric() != null) {
+
prometheusReporter.getAuditMetric().updateSourceAndSinkAuditDiffMetric(diff);
+ }
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/reporter/OpenTelemetryReporter.java
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/reporter/OpenTelemetryReporter.java
new file mode 100644
index 0000000000..83f3a04947
--- /dev/null
+++
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/reporter/OpenTelemetryReporter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.tool.reporter;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.inlong.audit.tool.config.ConfigConstants.*;
+
+public class OpenTelemetryReporter implements MetricReporter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OpenTelemetryReporter.class);
+
+ protected SdkMeterProvider meterProvider;
+ public Meter meter;
+ public LongCounter alertCounter;
+ // For Gauge, we need a way to hold the latest value for each dimension
set.
+ // A map is a good way to handle this dynamically.
+ public final Map<Attributes, Double> dataLossRateValues = new
ConcurrentHashMap<>();
+
+ @Override
+ public void init(Map<String, Object> config) {
+ String endpoint = (String) config.getOrDefault(KEY_OTEL_ENDPOINT,
DEFAULT_OTEL_ENDPOINT);
+
+ OtlpGrpcMetricExporter metricExporter =
OtlpGrpcMetricExporter.builder()
+ .setEndpoint(endpoint)
+ .build();
+
+ this.meterProvider = SdkMeterProvider.builder()
+
.registerMetricReader(PeriodicMetricReader.builder(metricExporter)
+ .setInterval(Duration.ofSeconds(30))
+ .build())
+ .build();
+
+ // We don't build the full OpenTelemetrySdk unless we need
tracing/logs as well.
+ // For metrics only, managing the SdkMeterProvider is enough.
+ this.meter = meterProvider.get(AUDIT_TOOL_NAME);
+
+ this.alertCounter = meter.counterBuilder(AUDIT_TOOL_ALERTS_TOTAL)
+ .setDescription(DESC_AUDIT_TOOL_ALERTS_TOTAL)
+ .build();
+
+ // For Gauge, we use an observable gauge.
+ // It will call our callback function periodically to get the current
value.
+ meter.gaugeBuilder(AUDIT_TOOL_DATA_LOSS_RATE)
+ .setDescription(DESC_AUDIT_TOOL_DATA_LOSS_RATE)
+ .buildWithCallback(measurement -> {
+ dataLossRateValues.forEach((attributes, value) ->
measurement.record(value, attributes));
+ });
+ }
+
+}
diff --git
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/task/AuditCheckTask.java
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/task/AuditCheckTask.java
new file mode 100644
index 0000000000..777bda40c1
--- /dev/null
+++
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/task/AuditCheckTask.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.tool.task;
+
+import org.apache.inlong.audit.tool.config.AppConfig;
+import org.apache.inlong.audit.tool.config.ConfigConstants;
+import org.apache.inlong.audit.tool.dto.AuditAlertRule;
+import org.apache.inlong.audit.tool.entity.AuditMetric;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.service.AuditMetricService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AuditCheckTask class: Periodically fetches audit data and evaluates alert
policies.
+ */
+public class AuditCheckTask {
+
+ private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
+ private final AlertEvaluator alertEvaluator;
+ private final AuditAlertRuleManager auditAlertRuleManager;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditCheckTask.class);
+ private final AuditMetricService auditMetricService;
+ private Integer executionIntervalTime = 1;
+ private Integer intervalTimeMinute = 1;
+ private final Integer delayTimeMinute;
+ private static final DateTimeFormatter LOGS_FMT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private String sourceAuditId = "5";
+
+ public AuditCheckTask(
+ AuditAlertRuleManager auditAlertRuleManager, AlertEvaluator
alertEvaluator, AppConfig appConfig) {
+ this.auditAlertRuleManager = auditAlertRuleManager;
+ this.alertEvaluator = alertEvaluator;
+ this.auditMetricService = new AuditMetricService();
+ try {
+ this.executionIntervalTime =
+
Integer.valueOf(appConfig.getProperties().getProperty(ConfigConstants.KEY_DELAY_TIME,
"1"));
+ this.intervalTimeMinute =
+
Integer.parseInt(appConfig.getProperties().getProperty(ConfigConstants.KEY_INTERVAL_TIME,
"1"));
+ this.sourceAuditId =
appConfig.getProperties().getProperty(ConfigConstants.KEY_SOURCE_AUDIT_ID, "5");
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to read configuration information, default source
AuditId is 5, delay execution time is 1, time interval is 1");
+ }
+ this.delayTimeMinute = executionIntervalTime;
+ }
+
+ /**
+ * Initiate the audit inspection task
+ */
+ public void start() {
+ scheduler.scheduleAtFixedRate(this::checkAuditData, 0,
executionIntervalTime, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Check audit data and trigger alert evaluation.
+ */
+ private void checkAuditData() {
+ // Obtain auditIds provided by the interface
+ List<String> sinkAuditIds = auditAlertRuleManager.getAuditIds();
+ if (sinkAuditIds == null) {
+ return;
+ }
+
+ // Obtain alarm strategy
+ List<AuditAlertRule> alertRules =
auditAlertRuleManager.getAuditAlertRuleList();
+
+ // Obtain the range of logs that need to be queried
+ String startLogTs = getStartLogTs();
+ String endLogTs = getEndLogTs();
+
+ // Query the relevant indicator data of auditId source
+ List<AuditMetric> sourceAuditMetric =
+ auditMetricService.getStorageAuditMetrics(sourceAuditId,
startLogTs, endLogTs);
+ if (sourceAuditMetric == null) {
+ return;
+ }
+
+ // Compare the source auditId related indicator data with the sink
auditId related indicator data
+ for (String sinkAuditId : sinkAuditIds) {
+ List<AuditMetric> sinkAuditMetrics =
+ auditMetricService.getStorageAuditMetrics(sinkAuditId,
startLogTs, endLogTs);
+ if (sinkAuditMetrics == null || sinkAuditMetrics.isEmpty()) {
+ continue;
+ }
+ for (AuditAlertRule alertRule : alertRules) {
+ alertEvaluator.evaluateAndReportAlert(sourceAuditMetric,
sinkAuditMetrics,
+ alertRule);
+ }
+ }
+ }
+
+ /**
+ * Stop the audit inspection task
+ */
+ public void stop() {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private String getStartLogTs() {
+ return LocalDateTime.now()
+ .withSecond(0)
+ .minusMinutes(delayTimeMinute)
+ .minusMinutes(intervalTimeMinute)
+ .format(LOGS_FMT);
+ }
+ private String getEndLogTs() {
+ return LocalDateTime.now()
+ .withSecond(0)
+ .minusMinutes(delayTimeMinute)
+ .format(LOGS_FMT);
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/evaluator/AlertEvaluatorTest.java
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/evaluator/AlertEvaluatorTest.java
new file mode 100644
index 0000000000..f7798e9f75
--- /dev/null
+++
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/evaluator/AlertEvaluatorTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tool.evaluator;
+
+import org.apache.inlong.audit.tool.dto.AuditAlertCondition;
+import org.apache.inlong.audit.tool.dto.AuditAlertRule;
+import org.apache.inlong.audit.tool.entity.AuditMetric;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.reporter.PrometheusReporter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class AlertEvaluatorTest {
+
+ @Mock
+ private PrometheusReporter prometheusReporter;
+
+ @Mock
+ private AuditAlertRuleManager auditAlertRuleManager;
+
+ private AlertEvaluator alertEvaluator;
+
+ @BeforeEach
+ void setUp() {
+ alertEvaluator = new AlertEvaluator(prometheusReporter,
auditAlertRuleManager);
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithNullMetrics() {
+ // Test with null source metrics
+ alertEvaluator.evaluateAndReportAlert(null,
Collections.singletonList(new AuditMetric()), new AuditAlertRule());
+
+ // Test with null sink metrics
+ alertEvaluator.evaluateAndReportAlert(Collections.singletonList(new
AuditMetric()), null, new AuditAlertRule());
+
+ // Verify no interaction with prometheus reporter
+ verifyNoInteractions(prometheusReporter);
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithNonMatchingGroupAndStream() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group2", 90L); //
Different group
+
+ AuditAlertRule alertRule = createAlertRule(">", 0.1);
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify no alert was triggered
+ verifyNoInteractions(prometheusReporter);
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithZeroSourceCount() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 0L); // Zero
count
+ AuditMetric sinkMetric = createAuditMetric("group1", 90L);
+
+ AuditAlertRule alertRule = createAlertRule(">", 0.1);
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify no alert was triggered
+ verifyNoInteractions(prometheusReporter);
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithGreaterThanCondition() {
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 90L);
+
+ AuditAlertRule alertRule = createAlertRule(">", 0.1);
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify no alert was triggered
+ verifyNoInteractions(prometheusReporter);
+
+ // Setup - diff = (120-100)/100 = 0.2 which is > 0.1
+ AuditMetric sinkMetricWithAlert = createAuditMetric("group1", 120L);
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetricWithAlert),
+ alertRule);
+
+ // Verify alert was triggered
+ verify(prometheusReporter, times(1)).getAuditMetric();
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithGreaterThanOrEqualCondition() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 110L); // diff =
0.1
+
+ AuditAlertRule alertRule = createAlertRule(">=", 0.1);
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify alert was triggered
+ verify(prometheusReporter, times(1)).getAuditMetric();
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithLessThanCondition() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 90L); // diff =
-0.1
+
+ AuditAlertRule alertRule = createAlertRule("<", -0.05); // -0.1 < -0.05
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify alert was triggered
+ verify(prometheusReporter, times(1)).getAuditMetric();
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithLessThanOrEqualCondition() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 90L); // diff =
-0.1
+
+ AuditAlertRule alertRule = createAlertRule("<=", -0.1); // -0.1 <= -0.1
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify alert was triggered
+ verify(prometheusReporter, times(1)).getAuditMetric();
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithEqualCondition() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 110L); // diff =
0.1
+
+ AuditAlertRule alertRule = createAlertRule("==", 0.1);
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify alert was triggered
+ verify(prometheusReporter, times(1)).getAuditMetric();
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithNotEqualCondition() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 110L); // diff =
0.1
+
+ AuditAlertRule alertRule = createAlertRule("!=", 0.2); // 0.1 != 0.2
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify alert was triggered
+ verify(prometheusReporter, times(1)).getAuditMetric();
+ }
+
+ @Test
+ void testEvaluateAndReportAlertWithUnknownOperator() {
+ // Setup
+ AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+ AuditMetric sinkMetric = createAuditMetric("group1", 110L);
+
+ AuditAlertRule alertRule = createAlertRule("unknown", 0.1); // Unknown
operator
+
+ // Execute
+ alertEvaluator.evaluateAndReportAlert(
+ Collections.singletonList(sourceMetric),
+ Collections.singletonList(sinkMetric),
+ alertRule);
+
+ // Verify no alert was triggered for unknown operator
+ verifyNoInteractions(prometheusReporter);
+ }
+
+ @Test
+ void testGetAuditAlertRuleManager() {
+ // Test the getter method
+ assertEquals(auditAlertRuleManager,
alertEvaluator.getAuditAlertRuleManager());
+ }
+
+ // Helper methods to reduce duplication
+ private AuditMetric createAuditMetric(String groupId, long count) {
+ AuditMetric metric = new AuditMetric();
+ metric.setInlongGroupId(groupId);
+ metric.setInlongStreamId("stream1");
+ metric.setCount(count);
+ return metric;
+ }
+
+ private AuditAlertRule createAlertRule(String operator, double value) {
+ AuditAlertRule rule = new AuditAlertRule();
+ rule.setInlongGroupId("group1");
+ rule.setInlongStreamId("stream1");
+
+ AuditAlertCondition condition = new AuditAlertCondition();
+ condition.setOperator(operator);
+ condition.setValue(value);
+ rule.setCondition(condition);
+
+ return rule;
+ }
+}
diff --git
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
index 97e7349dd5..f7a7c4c0ec 100644
---
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
+++
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
@@ -19,54 +19,39 @@ package org.apache.inlong.tool.service;
import org.apache.inlong.audit.tool.config.AppConfig;
import org.apache.inlong.audit.tool.entity.AuditMetric;
-import org.apache.inlong.audit.tool.mapper.AuditMapper;
import org.apache.inlong.audit.tool.service.AuditMetricService;
import org.apache.inlong.audit.tool.util.AuditSQLUtil;
-import org.apache.ibatis.session.SqlSession;
import org.junit.jupiter.api.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.List;
-import static org.junit.jupiter.api.Assertions.*;
-
public class AuditMetricServiceTest {
- @Test
- public void getStorageAuditMetricsReturnsEmptyListWhenNoDataFound() {
- AuditMetricService auditMetricService = new AuditMetricService();
-
- try (MockedStatic<AuditSQLUtil> sqlUtilMockedStatic =
Mockito.mockStatic(AuditSQLUtil.class)) {
- SqlSession sqlSessionMock = Mockito.mock(SqlSession.class);
- AuditMapper auditMapperMock = Mockito.mock(AuditMapper.class);
-
-
sqlUtilMockedStatic.when(AuditSQLUtil::getSqlSession).thenReturn(sqlSessionMock);
-
Mockito.when(sqlSessionMock.getMapper(AuditMapper.class)).thenReturn(auditMapperMock);
- Mockito.when(auditMapperMock.getAuditMetrics(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(Collections.emptyList());
-
- List<AuditMetric> result =
auditMetricService.getStorageAuditMetrics("nonexistentId", "2023-01-01
00:00:00",
- "2023-01-01 01:00:00");
- assertTrue(result.isEmpty());
- }
- }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditMetricServiceTest.class);
@Test
- public void getStorageAuditMetricsHandlesInvalidTimestampsGracefully() {
+ public void testGetDataProxyAuditMetrics() {
// Query service initialization
AppConfig appConfig = new AppConfig();
AuditSQLUtil.initialize(appConfig.getProperties());
AuditMetricService auditMetricService = new AuditMetricService();
- String invalidStartLogTs = "invalid-timestamp";
- String invalidEndLogTs = "invalid-timestamp";
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
+ String endLogTs =
LocalDateTime.now().minusMinutes(1).format(formatter);
+ String startLogTs =
LocalDateTime.now().minusMinutes(5).format(formatter);
- List<AuditMetric> result =
auditMetricService.getStorageAuditMetrics("5", invalidStartLogTs,
invalidEndLogTs);
+ // Search for relevant data
+ List<AuditMetric> dataproxyAuditMetrics =
+ auditMetricService.getStorageAuditMetrics("5", startLogTs,
endLogTs);
- assertTrue(result.isEmpty());
+ for (AuditMetric auditMetric : dataproxyAuditMetrics) {
+ LOGGER.error("{} {} {}", auditMetric.getInlongGroupId(),
auditMetric.getInlongStreamId(),
+ auditMetric.getCount());
+ }
}
-
}
diff --git
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/task/AuditCheckTaskTest.java
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/task/AuditCheckTaskTest.java
new file mode 100644
index 0000000000..c8ded99fcf
--- /dev/null
+++
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/task/AuditCheckTaskTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tool.task;
+
+import org.apache.inlong.audit.tool.config.AppConfig;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.task.AuditCheckTask;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.*;
+
+public class AuditCheckTaskTest {
+
+ @Mock
+ private AuditAlertRuleManager auditAlertRuleManager =
AuditAlertRuleManager.getInstance();
+
+ @Mock
+ private AlertEvaluator alertEvaluator;
+
+ @Mock
+ private AppConfig appConfig;
+
+ @Mock
+ private Properties properties;
+
+ private AuditCheckTask auditCheckTask;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ // Mock AppConfig behavior
+ when(appConfig.getProperties()).thenReturn(properties);
+ when(properties.getProperty("audit.data.time.delay.minute",
"1")).thenReturn("1");
+ when(properties.getProperty("audit.data.time.interval.minute",
"1")).thenReturn("5");
+ when(properties.getProperty("audit.id.source", "5")).thenReturn("5");
+
+ // Create AuditCheckTask instance
+ auditCheckTask = new AuditCheckTask(auditAlertRuleManager,
alertEvaluator, appConfig);
+ }
+
+ @Test
+ public void testConstructor() {
+ // Verify that the object is created successfully
+ assertNotNull(auditCheckTask);
+
+ // Verify that the properties were read correctly
+ verify(appConfig, times(3)).getProperties();
+ verify(properties,
times(1)).getProperty("audit.data.time.delay.minute", "1");
+ verify(properties,
times(1)).getProperty("audit.data.time.interval.minute", "1");
+ verify(properties, times(1)).getProperty("audit.id.source", "5");
+ }
+
+ @Test
+ public void testStartMethod() throws NoSuchFieldException,
IllegalAccessException {
+ // Start the task
+ auditCheckTask.start();
+
+ // Access the scheduler field to verify it's not null
+ Field schedulerField =
AuditCheckTask.class.getDeclaredField("scheduler");
+ schedulerField.setAccessible(true);
+ ScheduledExecutorService scheduler = (ScheduledExecutorService)
schedulerField.get(auditCheckTask);
+
+ // Verify that the scheduler is not null
+ assertNotNull(scheduler);
+
+ // Stop the scheduler to clean up
+ auditCheckTask.stop();
+ }
+
+ @Test
+ public void testStopMethod() {
+ // Start the task first
+ auditCheckTask.start();
+
+ // Stop the task
+ auditCheckTask.stop();
+
+ // We can't easily verify the internal state without exposing it,
+ // but we can ensure the method runs without exception
+ // In a real test, we might use a more sophisticated approach to
verify shutdown
+ }
+
+ @Test
+ public void testConstructorWithNullAppConfig() {
+ // Test constructor with null AppConfig
+ AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager,
alertEvaluator, null);
+ assertNotNull(task);
+ }
+
+ @Test
+ public void testConstructorWithNullProperties() {
+ // Test constructor with null properties
+ when(appConfig.getProperties()).thenReturn(null);
+ AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager,
alertEvaluator, appConfig);
+ assertNotNull(task);
+ }
+
+ @Test
+ public void testConstructorWithInvalidInterval() {
+ // Test constructor with invalid interval value
+
when(properties.getProperty("audit.data.time.interval.minute")).thenReturn("invalid");
+ AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager,
alertEvaluator, appConfig);
+ assertNotNull(task);
+ }
+
+ @Test
+ public void testConstructorWithEmptyInterval() {
+ // Test constructor with empty interval value
+
when(properties.getProperty("audit.data.time.interval.minute")).thenReturn("");
+ AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager,
alertEvaluator, appConfig);
+ assertNotNull(task);
+ }
+}