This is an automated email from the ASF dual-hosted git repository.

dybyte pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a3ad1a104 [Improve][Zeta] Improve health metrics retrieval and 
parsing with enhanced logging (#10673)
4a3ad1a104 is described below

commit 4a3ad1a1046a38b697a79d40418eaacd7f6228cf
Author: Jast <[email protected]>
AuthorDate: Fri May 22 16:27:23 2026 +0800

    [Improve][Zeta] Improve health metrics retrieval and parsing with enhanced 
logging (#10673)
---
 .../apache/seatunnel/engine/e2e/LocalModeIT.java   |   1 +
 .../seatunnel/engine/client/SeaTunnelClient.java   | 139 ++++++++++++++++++---
 .../SeaTunnelClientHealthMetricsParsingTest.java   | 128 +++++++++++++++++++
 .../engine/server/SeaTunnelHealthMonitor.java      |   2 +-
 .../engine/server/rest/service/BaseService.java    |  78 ++++++++++--
 .../BaseServiceHealthMetricsParsingTest.java       | 107 ++++++++++++++++
 6 files changed, 430 insertions(+), 25 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
index b47a5b812b..45f13bf5c9 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
@@ -91,6 +91,7 @@ public class LocalModeIT {
             Map<String, String> clusterHealthMetrics = 
engineClient.getClusterHealthMetrics();
             Assertions.assertEquals(1, clusterHealthMetrics.size());
             
Assertions.assertTrue(clusterHealthMetrics.containsKey("[localhost]:9999"));
+            
Assertions.assertNotNull(engineClient.getHealthMetrics("[localhost]:9999"));
         } finally {
             if (engineClient != null) {
                 engineClient.close();
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index e42b9195e5..4de18d8799 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -40,8 +40,15 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 public class SeaTunnelClient implements SeaTunnelClientInstance, AutoCloseable 
{
+    private static final int INVALID_METRICS_LOG_INTERVAL_MS = 60_000;
+    private static final int INVALID_METRICS_LOG_PREFIX_MAX_LEN = 512;
+    private static final int INVALID_METRICS_LOG_TOKEN_MAX_LEN = 128;
+    private static final int INVALID_METRICS_LOG_TOKEN_MAX_COUNT = 5;
+    private static final AtomicLong LAST_INVALID_METRICS_LOG_TIME_MS = new 
AtomicLong(0L);
     private final SeaTunnelHazelcastClient hazelcastClient;
     @Getter private final JobClient jobClient;
 
@@ -174,23 +181,125 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance, AutoCloseable {
         Map<String, String> healthMetricsMap = new HashMap<>();
         members.forEach(
                 member -> {
-                    String metrics =
-                            hazelcastClient.requestAndDecodeResponse(
-                                    member.getUuid(),
-                                    
SeaTunnelGetClusterHealthMetricsCodec.encodeRequest(),
-                                    
SeaTunnelGetClusterHealthMetricsCodec::decodeResponse);
-                    String[] split = metrics.split(",");
-                    Map<String, String> kvMap = new LinkedHashMap<>();
-                    Arrays.stream(split)
-                            .forEach(
-                                    kv -> {
-                                        String[] kvArr = kv.split("=");
-                                        kvMap.put(kvArr[0], kvArr[1]);
-                                    });
                     healthMetricsMap.put(
-                            member.getAddress().toString(), 
JsonUtils.toJsonString(kvMap));
+                            member.getAddress().toString(), 
getMetricsByMember(member));
                 });
-
         return healthMetricsMap;
     }
+
+    public String getHealthMetrics(String address) {
+        Set<Member> members = 
hazelcastClient.getHazelcastInstance().getCluster().getMembers();
+        Member member =
+                members.stream()
+                        .filter(m -> m.getAddress().toString().equals(address))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Member with address "
+                                                        + address
+                                                        + " not found in the 
cluster."));
+        return getMetricsByMember(member);
+    }
+
+    private String getMetricsByMember(Member member) {
+        String metrics =
+                hazelcastClient.requestAndDecodeResponse(
+                        member.getUuid(),
+                        SeaTunnelGetClusterHealthMetricsCodec.encodeRequest(),
+                        SeaTunnelGetClusterHealthMetricsCodec::decodeResponse);
+        return JsonUtils.toJsonString(
+                parseHealthMetricsString(metrics, 
member.getAddress().toString(), getLogger()));
+    }
+
+    static Map<String, String> parseHealthMetricsString(String metrics) {
+        return parseHealthMetricsString(metrics, null, null);
+    }
+
+    static Map<String, String> parseHealthMetricsString(
+            String metrics, String memberAddress, ILogger logger) {
+        Map<String, String> kvMap = new LinkedHashMap<>();
+        if (metrics == null || metrics.isEmpty()) {
+            return kvMap;
+        }
+
+        // SeaTunnelHealthMonitor#render uses ", " as pair separators. 
Splitting on ",\\s+" avoids
+        // breaking values which may contain commas (e.g. decimal separator 
under some locales).
+        String[] pairs = metrics.split(",\\s+");
+        List<String> invalidTokens =
+                Arrays.stream(pairs)
+                        .map(kv -> kv == null ? "" : kv.trim())
+                        .filter(trimmed -> !trimmed.isEmpty())
+                        .filter(trimmed -> trimmed.indexOf('=') <= 0)
+                        .limit(INVALID_METRICS_LOG_TOKEN_MAX_COUNT)
+                        .map(token -> truncateForLog(token, 
INVALID_METRICS_LOG_TOKEN_MAX_LEN))
+                        .collect(Collectors.toList());
+
+        Arrays.stream(pairs)
+                .forEach(
+                        kv -> {
+                            if (kv == null) {
+                                return;
+                            }
+                            String trimmed = kv.trim();
+                            if (trimmed.isEmpty()) {
+                                return;
+                            }
+                            int eqIndex = trimmed.indexOf('=');
+                            if (eqIndex <= 0) {
+                                return;
+                            }
+                            String key = trimmed.substring(0, eqIndex).trim();
+                            String value =
+                                    eqIndex == trimmed.length() - 1
+                                            ? ""
+                                            : trimmed.substring(eqIndex + 
1).trim();
+                            if (!key.isEmpty()) {
+                                kvMap.put(key, value);
+                            }
+                        });
+
+        logInvalidHealthMetricsIfNeeded(metrics, memberAddress, invalidTokens, 
logger);
+        return kvMap;
+    }
+
+    private static void logInvalidHealthMetricsIfNeeded(
+            String metrics, String memberAddress, List<String> invalidTokens, 
ILogger logger) {
+        if (logger == null || invalidTokens == null || 
invalidTokens.isEmpty()) {
+            return;
+        }
+        if (!logger.isWarningEnabled()) {
+            return;
+        }
+        long now = System.currentTimeMillis();
+        long last = LAST_INVALID_METRICS_LOG_TIME_MS.get();
+        if (now - last < INVALID_METRICS_LOG_INTERVAL_MS) {
+            return;
+        }
+        if (!LAST_INVALID_METRICS_LOG_TIME_MS.compareAndSet(last, now)) {
+            return;
+        }
+
+        String address = memberAddress == null ? "unknown" : memberAddress;
+        String prefix =
+                truncateForLog(metrics == null ? "" : metrics, 
INVALID_METRICS_LOG_PREFIX_MAX_LEN);
+        logger.warning(
+                "Invalid zeta health metrics token(s) from member "
+                        + address
+                        + ", tokens="
+                        + invalidTokens
+                        + ", rawPrefix="
+                        + prefix);
+    }
+
+    private static String truncateForLog(String s, int maxLen) {
+        if (s == null) {
+            return "";
+        }
+        String normalized = s.replace('\n', ' ').replace('\r', ' ');
+        if (maxLen <= 0) {
+            return "";
+        }
+        return normalized.length() <= maxLen ? normalized : 
normalized.substring(0, maxLen) + "...";
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientHealthMetricsParsingTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientHealthMetricsParsingTest.java
new file mode 100644
index 0000000000..328b542ed7
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientHealthMetricsParsingTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.client;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import com.hazelcast.logging.ILogger;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SeaTunnelClientHealthMetricsParsingTest {
+
+    @Test
+    public void testParseHealthMetricsStringNormal() {
+        Map<String, String> parsed = 
SeaTunnelClient.parseHealthMetricsString("a=1, b=2, c=3");
+        Assertions.assertEquals("1", parsed.get("a"));
+        Assertions.assertEquals("2", parsed.get("b"));
+        Assertions.assertEquals("3", parsed.get("c"));
+    }
+
+    @Test
+    public void testParseHealthMetricsStringIgnoreMalformedPairs() {
+        Map<String, String> parsed =
+                SeaTunnelClient.parseHealthMetricsString("a=1, broken, b=2, 
=x, c=");
+        Assertions.assertEquals("1", parsed.get("a"));
+        Assertions.assertEquals("2", parsed.get("b"));
+        Assertions.assertEquals("", parsed.get("c"));
+        Assertions.assertFalse(parsed.containsKey(""));
+    }
+
+    @Test
+    public void testParseHealthMetricsStringKeepCommaInsideValue() {
+        Map<String, String> parsed =
+                SeaTunnelClient.parseHealthMetricsString(
+                        "load.process=12,34%, heap.memory.used=1,2GB, 
connection.count=10");
+        Assertions.assertEquals("12,34%", parsed.get("load.process"));
+        Assertions.assertEquals("1,2GB", parsed.get("heap.memory.used"));
+        Assertions.assertEquals("10", parsed.get("connection.count"));
+    }
+
+    @Test
+    public void testParseHealthMetricsStringNullOrEmpty() {
+        
Assertions.assertTrue(SeaTunnelClient.parseHealthMetricsString(null).isEmpty());
+        
Assertions.assertTrue(SeaTunnelClient.parseHealthMetricsString("").isEmpty());
+        Assertions.assertTrue(SeaTunnelClient.parseHealthMetricsString("   
").isEmpty());
+    }
+
+    @Test
+    public void testParseHealthMetricsStringKeepAdditionalEqualsInValue() {
+        Map<String, String> parsed =
+                SeaTunnelClient.parseHealthMetricsString("token=a=b=c, 
processors=8");
+        Assertions.assertEquals("a=b=c", parsed.get("token"));
+        Assertions.assertEquals("8", parsed.get("processors"));
+    }
+
+    @Test
+    public void 
testParseHealthMetricsStringLogMalformedTokenWhenLoggerEnabled() {
+        long originalLastLogTime = setLastInvalidMetricsLogTimeMs(0L);
+        ILogger logger = Mockito.mock(ILogger.class);
+        Mockito.when(logger.isWarningEnabled()).thenReturn(true);
+
+        try {
+            Map<String, String> parsed =
+                    SeaTunnelClient.parseHealthMetricsString(
+                            "load.process=12,34%, broken, processors=8", 
"127.0.0.1:5801", logger);
+
+            Assertions.assertEquals("12,34%", parsed.get("load.process"));
+            Assertions.assertEquals("8", parsed.get("processors"));
+            Mockito.verify(logger)
+                    .warning(Mockito.contains("Invalid zeta health metrics 
token(s) from member"));
+        } finally {
+            setLastInvalidMetricsLogTimeMs(originalLastLogTime);
+        }
+    }
+
+    @Test
+    public void testParseHealthMetricsStringWithNullLogger() {
+        Map<String, String> parsed =
+                SeaTunnelClient.parseHealthMetricsString("a=1, broken, c=", 
"127.0.0.1:5801", null);
+        Assertions.assertEquals("1", parsed.get("a"));
+        Assertions.assertEquals("", parsed.get("c"));
+    }
+
+    @Test
+    public void testParseHealthMetricsStringLogRateLimit() {
+        long originalLastLogTime = setLastInvalidMetricsLogTimeMs(0L);
+        ILogger logger = Mockito.mock(ILogger.class);
+        Mockito.when(logger.isWarningEnabled()).thenReturn(true);
+
+        try {
+            SeaTunnelClient.parseHealthMetricsString("broken1, broken2", 
"127.0.0.1:5801", logger);
+            SeaTunnelClient.parseHealthMetricsString("broken3, broken4", 
"127.0.0.1:5802", logger);
+            Mockito.verify(logger, 
Mockito.times(1)).warning(Mockito.anyString());
+        } finally {
+            setLastInvalidMetricsLogTimeMs(originalLastLogTime);
+        }
+    }
+
+    private static long setLastInvalidMetricsLogTimeMs(long value) {
+        try {
+            Field field =
+                    
SeaTunnelClient.class.getDeclaredField("LAST_INVALID_METRICS_LOG_TIME_MS");
+            field.setAccessible(true);
+            AtomicLong lastInvalidMetricsLogTimeMs = (AtomicLong) 
field.get(null);
+            return lastInvalidMetricsLogTimeMs.getAndSet(value);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException("Failed to set 
LAST_INVALID_METRICS_LOG_TIME_MS", e);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
index 10b3bb6a5a..988cbe3cd0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
@@ -261,7 +261,7 @@ public class SeaTunnelHealthMonitor {
 
             double value = osSystemLoadAverage.read();
             if (value < 0) {
-                sb.append("load.systemAverage").append("=n/a ");
+                sb.append("load.systemAverage").append("=n/a, ");
             } else {
                 sb.append("load.systemAverage")
                         .append('=')
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 4e2cd33eee..c2e5cf68e9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -81,6 +81,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -117,6 +118,11 @@ import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SOURCE_
 public abstract class BaseService {
 
     private static final int JOB_METRICS_LOG_TRUNCATE_LENGTH = 500;
+    private static final int INVALID_METRICS_LOG_INTERVAL_MS = 60_000;
+    private static final int INVALID_METRICS_LOG_PREFIX_MAX_LEN = 512;
+    private static final int INVALID_METRICS_LOG_TOKEN_MAX_LEN = 128;
+    private static final int INVALID_METRICS_LOG_TOKEN_MAX_COUNT = 5;
+    private static final AtomicLong LAST_INVALID_METRICS_LOG_TIME_MS = new 
AtomicLong(0L);
     private static final Pattern VERTEX_IDENTIFIER_PATTERN =
             Pattern.compile("((?:Sink|Source|Transform)\\[(\\d+)\\])");
 
@@ -933,17 +939,71 @@ public abstract class BaseService {
 
                                         log.error("Failed to get cluster 
health metrics", e);
                                     }
-                                    String[] parts = input.split(", ");
-                                    JsonObject jobInfo = new JsonObject();
-                                    Arrays.stream(parts)
-                                            .forEach(
-                                                    part -> {
-                                                        String[] keyValue = 
part.split("=");
-                                                        
jobInfo.add(keyValue[0], keyValue[1]);
-                                                    });
-                                    return jobInfo;
+                                    return parseSystemMonitoringMetrics(input, 
address);
                                 })
                         .collect(JsonArray::new, JsonArray::add, 
JsonArray::add);
         return jsonValues;
     }
+
+    private JsonObject parseSystemMonitoringMetrics(String input, Address 
memberAddress) {
+        JsonObject jobInfo = new JsonObject();
+        if (input == null || input.isEmpty()) {
+            return jobInfo;
+        }
+
+        String[] parts = input.split(",\\s+");
+        List<String> invalidTokens = new ArrayList<>();
+        for (String part : parts) {
+            if (part == null) {
+                continue;
+            }
+            String trimmed = part.trim();
+            if (trimmed.isEmpty()) {
+                continue;
+            }
+            int equalIndex = trimmed.indexOf('=');
+            if (equalIndex <= 0) {
+                if (invalidTokens.size() < 
INVALID_METRICS_LOG_TOKEN_MAX_COUNT) {
+                    invalidTokens.add(truncateForLog(trimmed, 
INVALID_METRICS_LOG_TOKEN_MAX_LEN));
+                }
+                continue;
+            }
+            String key = trimmed.substring(0, equalIndex).trim();
+            if (key.isEmpty()) {
+                continue;
+            }
+            String value =
+                    equalIndex == trimmed.length() - 1
+                            ? ""
+                            : trimmed.substring(equalIndex + 1).trim();
+            jobInfo.add(key, value);
+        }
+
+        if (!invalidTokens.isEmpty() && log.isWarnEnabled() && 
shouldLogInvalidMetrics()) {
+            log.warn(
+                    "Ignored malformed cluster health metrics token(s) from 
member {}, tokens={}, rawPrefix={}",
+                    memberAddress == null ? "unknown" : memberAddress,
+                    invalidTokens,
+                    truncateForLog(input, INVALID_METRICS_LOG_PREFIX_MAX_LEN));
+        }
+
+        return jobInfo;
+    }
+
+    private static boolean shouldLogInvalidMetrics() {
+        long now = System.currentTimeMillis();
+        long last = LAST_INVALID_METRICS_LOG_TIME_MS.get();
+        if (now - last < INVALID_METRICS_LOG_INTERVAL_MS) {
+            return false;
+        }
+        return LAST_INVALID_METRICS_LOG_TIME_MS.compareAndSet(last, now);
+    }
+
+    private static String truncateForLog(String input, int maxLen) {
+        if (input == null || maxLen <= 0) {
+            return "";
+        }
+        String normalized = input.replace('\n', ' ').replace('\r', ' ');
+        return normalized.length() <= maxLen ? normalized : 
normalized.substring(0, maxLen) + "...";
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceHealthMetricsParsingTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceHealthMetricsParsingTest.java
new file mode 100644
index 0000000000..d20536d977
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceHealthMetricsParsingTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class BaseServiceHealthMetricsParsingTest {
+
+    private BaseService baseService;
+    private Address memberAddress;
+    private Method parseSystemMonitoringMetricsMethod;
+    private Method shouldLogInvalidMetricsMethod;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        baseService = new 
JobInfoService(org.mockito.Mockito.mock(NodeEngineImpl.class));
+        memberAddress = new Address("127.0.0.1", 5801);
+        parseSystemMonitoringMetricsMethod =
+                BaseService.class.getDeclaredMethod(
+                        "parseSystemMonitoringMetrics", String.class, 
Address.class);
+        parseSystemMonitoringMetricsMethod.setAccessible(true);
+        shouldLogInvalidMetricsMethod =
+                BaseService.class.getDeclaredMethod("shouldLogInvalidMetrics");
+        shouldLogInvalidMetricsMethod.setAccessible(true);
+    }
+
+    @Test
+    void testParseSystemMonitoringMetricsNullInput() throws Exception {
+        JsonObject parsed =
+                (JsonObject)
+                        parseSystemMonitoringMetricsMethod.invoke(baseService, 
null, memberAddress);
+        Assertions.assertEquals(0, parsed.size());
+    }
+
+    @Test
+    void testParseSystemMonitoringMetricsIgnoreMalformedTokens() throws 
Exception {
+        JsonObject parsed =
+                (JsonObject)
+                        parseSystemMonitoringMetricsMethod.invoke(
+                                baseService, "a=1, broken, =x, c=", 
memberAddress);
+        Assertions.assertEquals("1", parsed.getString("a", null));
+        Assertions.assertEquals("", parsed.getString("c", null));
+        Assertions.assertNull(parsed.get("broken"));
+        Assertions.assertEquals(2, parsed.size());
+    }
+
+    @Test
+    void testParseSystemMonitoringMetricsKeepCommaInsideValue() throws 
Exception {
+        JsonObject parsed =
+                (JsonObject)
+                        parseSystemMonitoringMetricsMethod.invoke(
+                                baseService,
+                                "load.process=12,34%, heap.memory.used=1,2GB, 
connection.count=10",
+                                memberAddress);
+        Assertions.assertEquals("12,34%", parsed.getString("load.process", 
null));
+        Assertions.assertEquals("1,2GB", parsed.getString("heap.memory.used", 
null));
+        Assertions.assertEquals("10", parsed.getString("connection.count", 
null));
+    }
+
+    @Test
+    void testMalformedMetricsWarningRateLimit() throws Exception {
+        long originalLastLogTime = setLastInvalidMetricsLogTimeMs(0L);
+
+        try {
+            Assertions.assertTrue((boolean) 
shouldLogInvalidMetricsMethod.invoke(null));
+            Assertions.assertFalse((boolean) 
shouldLogInvalidMetricsMethod.invoke(null));
+        } finally {
+            setLastInvalidMetricsLogTimeMs(originalLastLogTime);
+        }
+    }
+
+    private static long setLastInvalidMetricsLogTimeMs(long value) {
+        try {
+            Field field = 
BaseService.class.getDeclaredField("LAST_INVALID_METRICS_LOG_TIME_MS");
+            field.setAccessible(true);
+            AtomicLong lastInvalidMetricsLogTimeMs = (AtomicLong) 
field.get(null);
+            return lastInvalidMetricsLogTimeMs.getAndSet(value);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException("Failed to set 
LAST_INVALID_METRICS_LOG_TIME_MS", e);
+        }
+    }
+}

Reply via email to