This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new db53b49aa9 [Hotfix][Metrics] fix sporadic multi-metrics ci (#7468)
db53b49aa9 is described below
commit db53b49aa9d2711fddeecc2ffcd5b46506b56224
Author: corgy-w <[email protected]>
AuthorDate: Fri Aug 23 17:54:48 2024 +0800
[Hotfix][Metrics] fix sporadic multi-metrics ci (#7468)
* [Hotfix][Metrics] fix sporadic multi-metrics ci
* [Hotfix][Metrics] update
---
.../engine/client/SeaTunnelClientTest.java | 69 +++++++++++++++++-----
1 file changed, 55 insertions(+), 14 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index a8275a13b7..513b4eb29a 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -662,13 +662,13 @@ public class SeaTunnelClientTest {
Assertions.assertEquals(
totalCount.get(SOURCE_RECEIVED_COUNT),
tableCount.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(SOURCE_RECEIVED_COUNT))
+ .filter(e ->
e.getKey().startsWith(SOURCE_RECEIVED_COUNT + "#"))
.mapToLong(Map.Entry::getValue)
.sum());
Assertions.assertEquals(
totalCount.get(SINK_WRITE_COUNT),
tableCount.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(SINK_WRITE_COUNT))
+ .filter(e ->
e.getKey().startsWith(SINK_WRITE_COUNT + "#"))
.mapToLong(Map.Entry::getValue)
.sum());
Assertions.assertEquals(
@@ -684,18 +684,59 @@ public class SeaTunnelClientTest {
.mapToLong(Map.Entry::getValue)
.sum());
// Instantaneous rates in the same direction are directly added
- Assertions.assertEquals(
- totalCount.get(SOURCE_RECEIVED_QPS),
- tableCount.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(SOURCE_RECEIVED_QPS + "#"))
- .mapToLong(Map.Entry::getValue)
- .sum());
- Assertions.assertEquals(
- totalCount.get(SINK_WRITE_QPS),
- tableCount.entrySet().stream()
- .filter(e -> e.getKey().startsWith(SINK_WRITE_QPS
+ "#"))
- .mapToLong(Map.Entry::getValue)
- .sum());
+ // The size does not fluctuate more than %2 of the total value
+ Assertions.assertTrue(
+ Math.abs(
+ totalCount.get(SOURCE_RECEIVED_QPS)
+ - tableCount.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey()
+
.startsWith(
+
SOURCE_RECEIVED_QPS
+
+ "#"))
+
.mapToLong(Map.Entry::getValue)
+ .sum())
+ < totalCount.get(SOURCE_RECEIVED_QPS) * 0.02);
+ Assertions.assertTrue(
+ Math.abs(
+ totalCount.get(SINK_WRITE_QPS)
+ - tableCount.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey()
+
.startsWith(
+
SINK_WRITE_QPS
+
+ "#"))
+
.mapToLong(Map.Entry::getValue)
+ .sum())
+ < totalCount.get(SINK_WRITE_QPS) * 0.02);
+ Assertions.assertTrue(
+ Math.abs(
+
totalCount.get(SOURCE_RECEIVED_BYTES_PER_SECONDS)
+ - tableCount.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey()
+
.startsWith(
+
SOURCE_RECEIVED_BYTES_PER_SECONDS
+
+ "#"))
+
.mapToLong(Map.Entry::getValue)
+ .sum())
+ <
totalCount.get(SOURCE_RECEIVED_BYTES_PER_SECONDS) * 0.02);
+ Assertions.assertTrue(
+ Math.abs(
+
totalCount.get(SINK_WRITE_BYTES_PER_SECONDS)
+ - tableCount.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey()
+
.startsWith(
+
SINK_WRITE_BYTES_PER_SECONDS
+
+ "#"))
+
.mapToLong(Map.Entry::getValue)
+ .sum())
+ < totalCount.get(SINK_WRITE_BYTES_PER_SECONDS) *
0.02);
} catch (ExecutionException | InterruptedException |
JsonProcessingException e) {
throw new RuntimeException(e);