This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new db53b49aa9 [Hotfix][Metrics] fix sporadic multi-metrics ci (#7468)
db53b49aa9 is described below

commit db53b49aa9d2711fddeecc2ffcd5b46506b56224
Author: corgy-w <[email protected]>
AuthorDate: Fri Aug 23 17:54:48 2024 +0800

    [Hotfix][Metrics] fix sporadic multi-metrics ci (#7468)
    
    * [Hotfix][Metrics] fix sporadic multi-metrics ci
    
    * [Hotfix][Metrics] update
---
 .../engine/client/SeaTunnelClientTest.java         | 69 +++++++++++++++++-----
 1 file changed, 55 insertions(+), 14 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index a8275a13b7..513b4eb29a 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -662,13 +662,13 @@ public class SeaTunnelClientTest {
             Assertions.assertEquals(
                     totalCount.get(SOURCE_RECEIVED_COUNT),
                     tableCount.entrySet().stream()
-                            .filter(e -> 
e.getKey().startsWith(SOURCE_RECEIVED_COUNT))
+                            .filter(e -> 
e.getKey().startsWith(SOURCE_RECEIVED_COUNT + "#"))
                             .mapToLong(Map.Entry::getValue)
                             .sum());
             Assertions.assertEquals(
                     totalCount.get(SINK_WRITE_COUNT),
                     tableCount.entrySet().stream()
-                            .filter(e -> 
e.getKey().startsWith(SINK_WRITE_COUNT))
+                            .filter(e -> 
e.getKey().startsWith(SINK_WRITE_COUNT + "#"))
                             .mapToLong(Map.Entry::getValue)
                             .sum());
             Assertions.assertEquals(
@@ -684,18 +684,59 @@ public class SeaTunnelClientTest {
                             .mapToLong(Map.Entry::getValue)
                             .sum());
             // Instantaneous rates in the same direction are directly added
-            Assertions.assertEquals(
-                    totalCount.get(SOURCE_RECEIVED_QPS),
-                    tableCount.entrySet().stream()
-                            .filter(e -> 
e.getKey().startsWith(SOURCE_RECEIVED_QPS + "#"))
-                            .mapToLong(Map.Entry::getValue)
-                            .sum());
-            Assertions.assertEquals(
-                    totalCount.get(SINK_WRITE_QPS),
-                    tableCount.entrySet().stream()
-                            .filter(e -> e.getKey().startsWith(SINK_WRITE_QPS 
+ "#"))
-                            .mapToLong(Map.Entry::getValue)
-                            .sum());
+            // The size does not fluctuate more than %2 of the total value
+            Assertions.assertTrue(
+                    Math.abs(
+                                    totalCount.get(SOURCE_RECEIVED_QPS)
+                                            - tableCount.entrySet().stream()
+                                                    .filter(
+                                                            e ->
+                                                                    e.getKey()
+                                                                            
.startsWith(
+                                                                               
     SOURCE_RECEIVED_QPS
+                                                                               
             + "#"))
+                                                    
.mapToLong(Map.Entry::getValue)
+                                                    .sum())
+                            < totalCount.get(SOURCE_RECEIVED_QPS) * 0.02);
+            Assertions.assertTrue(
+                    Math.abs(
+                                    totalCount.get(SINK_WRITE_QPS)
+                                            - tableCount.entrySet().stream()
+                                                    .filter(
+                                                            e ->
+                                                                    e.getKey()
+                                                                            
.startsWith(
+                                                                               
     SINK_WRITE_QPS
+                                                                               
             + "#"))
+                                                    
.mapToLong(Map.Entry::getValue)
+                                                    .sum())
+                            < totalCount.get(SINK_WRITE_QPS) * 0.02);
+            Assertions.assertTrue(
+                    Math.abs(
+                                    
totalCount.get(SOURCE_RECEIVED_BYTES_PER_SECONDS)
+                                            - tableCount.entrySet().stream()
+                                                    .filter(
+                                                            e ->
+                                                                    e.getKey()
+                                                                            
.startsWith(
+                                                                               
     SOURCE_RECEIVED_BYTES_PER_SECONDS
+                                                                               
             + "#"))
+                                                    
.mapToLong(Map.Entry::getValue)
+                                                    .sum())
+                            < 
totalCount.get(SOURCE_RECEIVED_BYTES_PER_SECONDS) * 0.02);
+            Assertions.assertTrue(
+                    Math.abs(
+                                    
totalCount.get(SINK_WRITE_BYTES_PER_SECONDS)
+                                            - tableCount.entrySet().stream()
+                                                    .filter(
+                                                            e ->
+                                                                    e.getKey()
+                                                                            
.startsWith(
+                                                                               
     SINK_WRITE_BYTES_PER_SECONDS
+                                                                               
             + "#"))
+                                                    
.mapToLong(Map.Entry::getValue)
+                                                    .sum())
+                            < totalCount.get(SINK_WRITE_BYTES_PER_SECONDS) * 
0.02);
 
         } catch (ExecutionException | InterruptedException | 
JsonProcessingException e) {
             throw new RuntimeException(e);

Reply via email to