This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 894c4a96914 KAFKA-17525 Convert the UnknownServerException to
InvalidRequestException when altering client-metrics config at runtime (#17168)
894c4a96914 is described below
commit 894c4a96914695ae72859adf2d26afca26ba531b
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Oct 4 10:19:54 2024 +0800
KAFKA-17525 Convert the UnknownServerException to InvalidRequestException
when altering client-metrics config at runtime (#17168)
Reviewers: Apoorv Mittal <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../test/java/kafka/admin/ClientTelemetryTest.java | 19 +++++++++++++++++++
.../kafka/server/metrics/ClientMetricsConfigs.java | 7 +++++--
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/core/src/test/java/kafka/admin/ClientTelemetryTest.java
b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
index ff044a8d2e4..9e2a9f0057b 100644
--- a/core/src/test/java/kafka/admin/ClientTelemetryTest.java
+++ b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -57,11 +58,14 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static java.util.Arrays.asList;
import static
org.apache.kafka.clients.admin.AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
@@ -123,6 +127,17 @@ public class ClientTelemetryTest {
}
}
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void testIntervalMsParser(ClusterInstance clusterInstance) {
+ List<String> alterOpts = asList("--bootstrap-server",
clusterInstance.bootstrapServers(),
+ "--alter", "--entity-type", "client-metrics", "--entity-name",
"test", "--add-config", "interval.ms=bbb");
+ try (Admin client = clusterInstance.createAdminClient()) {
+ ConfigCommand.ConfigCommandOptions addOpts = new
ConfigCommand.ConfigCommandOptions(toArray(alterOpts));
+
+ Throwable e = assertThrows(ExecutionException.class, () ->
ConfigCommand.alterConfig(client, addOpts));
+
assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
+ }
+ }
@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
@@ -141,6 +156,10 @@ public class ClientTelemetryTest {
}
}
+ private static String[] toArray(List<String>... lists) {
+ return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
+ }
+
/**
* We should add a ClientTelemetry into plugins to test the
clientInstanceId method Otherwise the
* {@link
org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command
will not be supported
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index 3f19291fc83..830bd29a243 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -123,6 +123,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
validateProperties(properties);
}
+ @SuppressWarnings("unchecked")
private static void validateProperties(Properties properties) {
// Make sure that all the properties are valid
properties.forEach((key, value) -> {
@@ -131,9 +132,11 @@ public class ClientMetricsConfigs extends AbstractConfig {
}
});
+ Map<String, Object> parsed = CONFIG.parse(properties);
+
// Make sure that push interval is between 100ms and 1 hour.
if (properties.containsKey(PUSH_INTERVAL_MS)) {
- int pushIntervalMs =
Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
+ Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs >
MAX_INTERVAL_MS) {
String msg = String.format("Invalid value %s for %s, interval
must be between 100 and 3600000 (1 hour)",
pushIntervalMs, PUSH_INTERVAL_MS);
@@ -143,7 +146,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
// Make sure that client match patterns are valid by parsing them.
if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
- List<String> patterns =
Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(","));
+ List<String> patterns = (List<String>)
parsed.get(CLIENT_MATCH_PATTERN);
// Parse the client matching patterns to validate if the patterns
are valid.
parseMatchingPatterns(patterns);
}