This is an automated email from the ASF dual-hosted git repository.
dybyte pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a3ad1a104 [Improve][Zeta] Improve health metrics retrieval and
parsing with enhanced logging (#10673)
4a3ad1a104 is described below
commit 4a3ad1a1046a38b697a79d40418eaacd7f6228cf
Author: Jast <[email protected]>
AuthorDate: Fri May 22 16:27:23 2026 +0800
[Improve][Zeta] Improve health metrics retrieval and parsing with enhanced
logging (#10673)
---
.../apache/seatunnel/engine/e2e/LocalModeIT.java | 1 +
.../seatunnel/engine/client/SeaTunnelClient.java | 139 ++++++++++++++++++---
.../SeaTunnelClientHealthMetricsParsingTest.java | 128 +++++++++++++++++++
.../engine/server/SeaTunnelHealthMonitor.java | 2 +-
.../engine/server/rest/service/BaseService.java | 78 ++++++++++--
.../BaseServiceHealthMetricsParsingTest.java | 107 ++++++++++++++++
6 files changed, 430 insertions(+), 25 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
index b47a5b812b..45f13bf5c9 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/LocalModeIT.java
@@ -91,6 +91,7 @@ public class LocalModeIT {
Map<String, String> clusterHealthMetrics =
engineClient.getClusterHealthMetrics();
Assertions.assertEquals(1, clusterHealthMetrics.size());
Assertions.assertTrue(clusterHealthMetrics.containsKey("[localhost]:9999"));
+
Assertions.assertNotNull(engineClient.getHealthMetrics("[localhost]:9999"));
} finally {
if (engineClient != null) {
engineClient.close();
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index e42b9195e5..4de18d8799 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -40,8 +40,15 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
public class SeaTunnelClient implements SeaTunnelClientInstance, AutoCloseable
{
+ private static final int INVALID_METRICS_LOG_INTERVAL_MS = 60_000;
+ private static final int INVALID_METRICS_LOG_PREFIX_MAX_LEN = 512;
+ private static final int INVALID_METRICS_LOG_TOKEN_MAX_LEN = 128;
+ private static final int INVALID_METRICS_LOG_TOKEN_MAX_COUNT = 5;
+ private static final AtomicLong LAST_INVALID_METRICS_LOG_TIME_MS = new
AtomicLong(0L);
private final SeaTunnelHazelcastClient hazelcastClient;
@Getter private final JobClient jobClient;
@@ -174,23 +181,125 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance, AutoCloseable {
Map<String, String> healthMetricsMap = new HashMap<>();
members.forEach(
member -> {
- String metrics =
- hazelcastClient.requestAndDecodeResponse(
- member.getUuid(),
-
SeaTunnelGetClusterHealthMetricsCodec.encodeRequest(),
-
SeaTunnelGetClusterHealthMetricsCodec::decodeResponse);
- String[] split = metrics.split(",");
- Map<String, String> kvMap = new LinkedHashMap<>();
- Arrays.stream(split)
- .forEach(
- kv -> {
- String[] kvArr = kv.split("=");
- kvMap.put(kvArr[0], kvArr[1]);
- });
healthMetricsMap.put(
- member.getAddress().toString(),
JsonUtils.toJsonString(kvMap));
+ member.getAddress().toString(),
getMetricsByMember(member));
});
-
return healthMetricsMap;
}
+
+ public String getHealthMetrics(String address) {
+ Set<Member> members =
hazelcastClient.getHazelcastInstance().getCluster().getMembers();
+ Member member =
+ members.stream()
+ .filter(m -> m.getAddress().toString().equals(address))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Member with address "
+ + address
+ + " not found in the
cluster."));
+ return getMetricsByMember(member);
+ }
+
+ private String getMetricsByMember(Member member) {
+ String metrics =
+ hazelcastClient.requestAndDecodeResponse(
+ member.getUuid(),
+ SeaTunnelGetClusterHealthMetricsCodec.encodeRequest(),
+ SeaTunnelGetClusterHealthMetricsCodec::decodeResponse);
+ return JsonUtils.toJsonString(
+ parseHealthMetricsString(metrics,
member.getAddress().toString(), getLogger()));
+ }
+
+ static Map<String, String> parseHealthMetricsString(String metrics) {
+ return parseHealthMetricsString(metrics, null, null);
+ }
+
+ static Map<String, String> parseHealthMetricsString(
+ String metrics, String memberAddress, ILogger logger) {
+ Map<String, String> kvMap = new LinkedHashMap<>();
+ if (metrics == null || metrics.isEmpty()) {
+ return kvMap;
+ }
+
+ // SeaTunnelHealthMonitor#render uses ", " as pair separators.
Splitting on ",\\s+" avoids
+ // breaking values which may contain commas (e.g. decimal separator
under some locales).
+ String[] pairs = metrics.split(",\\s+");
+ List<String> invalidTokens =
+ Arrays.stream(pairs)
+ .map(kv -> kv == null ? "" : kv.trim())
+ .filter(trimmed -> !trimmed.isEmpty())
+ .filter(trimmed -> trimmed.indexOf('=') <= 0)
+ .limit(INVALID_METRICS_LOG_TOKEN_MAX_COUNT)
+ .map(token -> truncateForLog(token,
INVALID_METRICS_LOG_TOKEN_MAX_LEN))
+ .collect(Collectors.toList());
+
+ Arrays.stream(pairs)
+ .forEach(
+ kv -> {
+ if (kv == null) {
+ return;
+ }
+ String trimmed = kv.trim();
+ if (trimmed.isEmpty()) {
+ return;
+ }
+ int eqIndex = trimmed.indexOf('=');
+ if (eqIndex <= 0) {
+ return;
+ }
+ String key = trimmed.substring(0, eqIndex).trim();
+ String value =
+ eqIndex == trimmed.length() - 1
+ ? ""
+ : trimmed.substring(eqIndex +
1).trim();
+ if (!key.isEmpty()) {
+ kvMap.put(key, value);
+ }
+ });
+
+ logInvalidHealthMetricsIfNeeded(metrics, memberAddress, invalidTokens,
logger);
+ return kvMap;
+ }
+
+ private static void logInvalidHealthMetricsIfNeeded(
+ String metrics, String memberAddress, List<String> invalidTokens,
ILogger logger) {
+ if (logger == null || invalidTokens == null ||
invalidTokens.isEmpty()) {
+ return;
+ }
+ if (!logger.isWarningEnabled()) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ long last = LAST_INVALID_METRICS_LOG_TIME_MS.get();
+ if (now - last < INVALID_METRICS_LOG_INTERVAL_MS) {
+ return;
+ }
+ if (!LAST_INVALID_METRICS_LOG_TIME_MS.compareAndSet(last, now)) {
+ return;
+ }
+
+ String address = memberAddress == null ? "unknown" : memberAddress;
+ String prefix =
+ truncateForLog(metrics == null ? "" : metrics,
INVALID_METRICS_LOG_PREFIX_MAX_LEN);
+ logger.warning(
+ "Invalid zeta health metrics token(s) from member "
+ + address
+ + ", tokens="
+ + invalidTokens
+ + ", rawPrefix="
+ + prefix);
+ }
+
+ private static String truncateForLog(String s, int maxLen) {
+ if (s == null) {
+ return "";
+ }
+ String normalized = s.replace('\n', ' ').replace('\r', ' ');
+ if (maxLen <= 0) {
+ return "";
+ }
+ return normalized.length() <= maxLen ? normalized :
normalized.substring(0, maxLen) + "...";
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientHealthMetricsParsingTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientHealthMetricsParsingTest.java
new file mode 100644
index 0000000000..328b542ed7
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientHealthMetricsParsingTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.client;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import com.hazelcast.logging.ILogger;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SeaTunnelClientHealthMetricsParsingTest {
+
+ @Test
+ public void testParseHealthMetricsStringNormal() {
+ Map<String, String> parsed =
SeaTunnelClient.parseHealthMetricsString("a=1, b=2, c=3");
+ Assertions.assertEquals("1", parsed.get("a"));
+ Assertions.assertEquals("2", parsed.get("b"));
+ Assertions.assertEquals("3", parsed.get("c"));
+ }
+
+ @Test
+ public void testParseHealthMetricsStringIgnoreMalformedPairs() {
+ Map<String, String> parsed =
+ SeaTunnelClient.parseHealthMetricsString("a=1, broken, b=2,
=x, c=");
+ Assertions.assertEquals("1", parsed.get("a"));
+ Assertions.assertEquals("2", parsed.get("b"));
+ Assertions.assertEquals("", parsed.get("c"));
+ Assertions.assertFalse(parsed.containsKey(""));
+ }
+
+ @Test
+ public void testParseHealthMetricsStringKeepCommaInsideValue() {
+ Map<String, String> parsed =
+ SeaTunnelClient.parseHealthMetricsString(
+ "load.process=12,34%, heap.memory.used=1,2GB,
connection.count=10");
+ Assertions.assertEquals("12,34%", parsed.get("load.process"));
+ Assertions.assertEquals("1,2GB", parsed.get("heap.memory.used"));
+ Assertions.assertEquals("10", parsed.get("connection.count"));
+ }
+
+ @Test
+ public void testParseHealthMetricsStringNullOrEmpty() {
+
Assertions.assertTrue(SeaTunnelClient.parseHealthMetricsString(null).isEmpty());
+
Assertions.assertTrue(SeaTunnelClient.parseHealthMetricsString("").isEmpty());
+ Assertions.assertTrue(SeaTunnelClient.parseHealthMetricsString("
").isEmpty());
+ }
+
+ @Test
+ public void testParseHealthMetricsStringKeepAdditionalEqualsInValue() {
+ Map<String, String> parsed =
+ SeaTunnelClient.parseHealthMetricsString("token=a=b=c,
processors=8");
+ Assertions.assertEquals("a=b=c", parsed.get("token"));
+ Assertions.assertEquals("8", parsed.get("processors"));
+ }
+
+ @Test
+ public void
testParseHealthMetricsStringLogMalformedTokenWhenLoggerEnabled() {
+ long originalLastLogTime = setLastInvalidMetricsLogTimeMs(0L);
+ ILogger logger = Mockito.mock(ILogger.class);
+ Mockito.when(logger.isWarningEnabled()).thenReturn(true);
+
+ try {
+ Map<String, String> parsed =
+ SeaTunnelClient.parseHealthMetricsString(
+ "load.process=12,34%, broken, processors=8",
"127.0.0.1:5801", logger);
+
+ Assertions.assertEquals("12,34%", parsed.get("load.process"));
+ Assertions.assertEquals("8", parsed.get("processors"));
+ Mockito.verify(logger)
+ .warning(Mockito.contains("Invalid zeta health metrics
token(s) from member"));
+ } finally {
+ setLastInvalidMetricsLogTimeMs(originalLastLogTime);
+ }
+ }
+
+ @Test
+ public void testParseHealthMetricsStringWithNullLogger() {
+ Map<String, String> parsed =
+ SeaTunnelClient.parseHealthMetricsString("a=1, broken, c=",
"127.0.0.1:5801", null);
+ Assertions.assertEquals("1", parsed.get("a"));
+ Assertions.assertEquals("", parsed.get("c"));
+ }
+
+ @Test
+ public void testParseHealthMetricsStringLogRateLimit() {
+ long originalLastLogTime = setLastInvalidMetricsLogTimeMs(0L);
+ ILogger logger = Mockito.mock(ILogger.class);
+ Mockito.when(logger.isWarningEnabled()).thenReturn(true);
+
+ try {
+ SeaTunnelClient.parseHealthMetricsString("broken1, broken2",
"127.0.0.1:5801", logger);
+ SeaTunnelClient.parseHealthMetricsString("broken3, broken4",
"127.0.0.1:5802", logger);
+ Mockito.verify(logger,
Mockito.times(1)).warning(Mockito.anyString());
+ } finally {
+ setLastInvalidMetricsLogTimeMs(originalLastLogTime);
+ }
+ }
+
+ private static long setLastInvalidMetricsLogTimeMs(long value) {
+ try {
+ Field field =
+
SeaTunnelClient.class.getDeclaredField("LAST_INVALID_METRICS_LOG_TIME_MS");
+ field.setAccessible(true);
+ AtomicLong lastInvalidMetricsLogTimeMs = (AtomicLong)
field.get(null);
+ return lastInvalidMetricsLogTimeMs.getAndSet(value);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to set
LAST_INVALID_METRICS_LOG_TIME_MS", e);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
index 10b3bb6a5a..988cbe3cd0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
@@ -261,7 +261,7 @@ public class SeaTunnelHealthMonitor {
double value = osSystemLoadAverage.read();
if (value < 0) {
- sb.append("load.systemAverage").append("=n/a ");
+ sb.append("load.systemAverage").append("=n/a, ");
} else {
sb.append("load.systemAverage")
.append('=')
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 4e2cd33eee..c2e5cf68e9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -81,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -117,6 +118,11 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SOURCE_
public abstract class BaseService {
private static final int JOB_METRICS_LOG_TRUNCATE_LENGTH = 500;
+ private static final int INVALID_METRICS_LOG_INTERVAL_MS = 60_000;
+ private static final int INVALID_METRICS_LOG_PREFIX_MAX_LEN = 512;
+ private static final int INVALID_METRICS_LOG_TOKEN_MAX_LEN = 128;
+ private static final int INVALID_METRICS_LOG_TOKEN_MAX_COUNT = 5;
+ private static final AtomicLong LAST_INVALID_METRICS_LOG_TIME_MS = new
AtomicLong(0L);
private static final Pattern VERTEX_IDENTIFIER_PATTERN =
Pattern.compile("((?:Sink|Source|Transform)\\[(\\d+)\\])");
@@ -933,17 +939,71 @@ public abstract class BaseService {
log.error("Failed to get cluster
health metrics", e);
}
- String[] parts = input.split(", ");
- JsonObject jobInfo = new JsonObject();
- Arrays.stream(parts)
- .forEach(
- part -> {
- String[] keyValue =
part.split("=");
-
jobInfo.add(keyValue[0], keyValue[1]);
- });
- return jobInfo;
+ return parseSystemMonitoringMetrics(input,
address);
})
.collect(JsonArray::new, JsonArray::add,
JsonArray::add);
return jsonValues;
}
+
+ private JsonObject parseSystemMonitoringMetrics(String input, Address
memberAddress) {
+ JsonObject jobInfo = new JsonObject();
+ if (input == null || input.isEmpty()) {
+ return jobInfo;
+ }
+
+ String[] parts = input.split(",\\s+");
+ List<String> invalidTokens = new ArrayList<>();
+ for (String part : parts) {
+ if (part == null) {
+ continue;
+ }
+ String trimmed = part.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ int equalIndex = trimmed.indexOf('=');
+ if (equalIndex <= 0) {
+ if (invalidTokens.size() <
INVALID_METRICS_LOG_TOKEN_MAX_COUNT) {
+ invalidTokens.add(truncateForLog(trimmed,
INVALID_METRICS_LOG_TOKEN_MAX_LEN));
+ }
+ continue;
+ }
+ String key = trimmed.substring(0, equalIndex).trim();
+ if (key.isEmpty()) {
+ continue;
+ }
+ String value =
+ equalIndex == trimmed.length() - 1
+ ? ""
+ : trimmed.substring(equalIndex + 1).trim();
+ jobInfo.add(key, value);
+ }
+
+ if (!invalidTokens.isEmpty() && log.isWarnEnabled() &&
shouldLogInvalidMetrics()) {
+ log.warn(
+ "Ignored malformed cluster health metrics token(s) from
member {}, tokens={}, rawPrefix={}",
+ memberAddress == null ? "unknown" : memberAddress,
+ invalidTokens,
+ truncateForLog(input, INVALID_METRICS_LOG_PREFIX_MAX_LEN));
+ }
+
+ return jobInfo;
+ }
+
+ private static boolean shouldLogInvalidMetrics() {
+ long now = System.currentTimeMillis();
+ long last = LAST_INVALID_METRICS_LOG_TIME_MS.get();
+ if (now - last < INVALID_METRICS_LOG_INTERVAL_MS) {
+ return false;
+ }
+ return LAST_INVALID_METRICS_LOG_TIME_MS.compareAndSet(last, now);
+ }
+
+ private static String truncateForLog(String input, int maxLen) {
+ if (input == null || maxLen <= 0) {
+ return "";
+ }
+ String normalized = input.replace('\n', ' ').replace('\r', ' ');
+ return normalized.length() <= maxLen ? normalized :
normalized.substring(0, maxLen) + "...";
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceHealthMetricsParsingTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceHealthMetricsParsingTest.java
new file mode 100644
index 0000000000..d20536d977
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceHealthMetricsParsingTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class BaseServiceHealthMetricsParsingTest {
+
+ private BaseService baseService;
+ private Address memberAddress;
+ private Method parseSystemMonitoringMetricsMethod;
+ private Method shouldLogInvalidMetricsMethod;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ baseService = new
JobInfoService(org.mockito.Mockito.mock(NodeEngineImpl.class));
+ memberAddress = new Address("127.0.0.1", 5801);
+ parseSystemMonitoringMetricsMethod =
+ BaseService.class.getDeclaredMethod(
+ "parseSystemMonitoringMetrics", String.class,
Address.class);
+ parseSystemMonitoringMetricsMethod.setAccessible(true);
+ shouldLogInvalidMetricsMethod =
+ BaseService.class.getDeclaredMethod("shouldLogInvalidMetrics");
+ shouldLogInvalidMetricsMethod.setAccessible(true);
+ }
+
+ @Test
+ void testParseSystemMonitoringMetricsNullInput() throws Exception {
+ JsonObject parsed =
+ (JsonObject)
+ parseSystemMonitoringMetricsMethod.invoke(baseService,
null, memberAddress);
+ Assertions.assertEquals(0, parsed.size());
+ }
+
+ @Test
+ void testParseSystemMonitoringMetricsIgnoreMalformedTokens() throws
Exception {
+ JsonObject parsed =
+ (JsonObject)
+ parseSystemMonitoringMetricsMethod.invoke(
+ baseService, "a=1, broken, =x, c=",
memberAddress);
+ Assertions.assertEquals("1", parsed.getString("a", null));
+ Assertions.assertEquals("", parsed.getString("c", null));
+ Assertions.assertNull(parsed.get("broken"));
+ Assertions.assertEquals(2, parsed.size());
+ }
+
+ @Test
+ void testParseSystemMonitoringMetricsKeepCommaInsideValue() throws
Exception {
+ JsonObject parsed =
+ (JsonObject)
+ parseSystemMonitoringMetricsMethod.invoke(
+ baseService,
+ "load.process=12,34%, heap.memory.used=1,2GB,
connection.count=10",
+ memberAddress);
+ Assertions.assertEquals("12,34%", parsed.getString("load.process",
null));
+ Assertions.assertEquals("1,2GB", parsed.getString("heap.memory.used",
null));
+ Assertions.assertEquals("10", parsed.getString("connection.count",
null));
+ }
+
+ @Test
+ void testMalformedMetricsWarningRateLimit() throws Exception {
+ long originalLastLogTime = setLastInvalidMetricsLogTimeMs(0L);
+
+ try {
+ Assertions.assertTrue((boolean)
shouldLogInvalidMetricsMethod.invoke(null));
+ Assertions.assertFalse((boolean)
shouldLogInvalidMetricsMethod.invoke(null));
+ } finally {
+ setLastInvalidMetricsLogTimeMs(originalLastLogTime);
+ }
+ }
+
+ private static long setLastInvalidMetricsLogTimeMs(long value) {
+ try {
+ Field field =
BaseService.class.getDeclaredField("LAST_INVALID_METRICS_LOG_TIME_MS");
+ field.setAccessible(true);
+ AtomicLong lastInvalidMetricsLogTimeMs = (AtomicLong)
field.get(null);
+ return lastInvalidMetricsLogTimeMs.getAndSet(value);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to set
LAST_INVALID_METRICS_LOG_TIME_MS", e);
+ }
+ }
+}