This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 894c4a96914 KAFKA-17525 Convert the UnknownServerException to 
InvalidRequestException when altering client-metrics config at runtime (#17168)
894c4a96914 is described below

commit 894c4a96914695ae72859adf2d26afca26ba531b
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Oct 4 10:19:54 2024 +0800

    KAFKA-17525 Convert the UnknownServerException to InvalidRequestException 
when altering client-metrics config at runtime (#17168)
    
    Reviewers: Apoorv Mittal <[email protected]>, TengYao Chi 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../test/java/kafka/admin/ClientTelemetryTest.java    | 19 +++++++++++++++++++
 .../kafka/server/metrics/ClientMetricsConfigs.java    |  7 +++++--
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/core/src/test/java/kafka/admin/ClientTelemetryTest.java 
b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
index ff044a8d2e4..9e2a9f0057b 100644
--- a/core/src/test/java/kafka/admin/ClientTelemetryTest.java
+++ b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -57,11 +58,14 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
 import static 
org.apache.kafka.clients.admin.AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
@@ -123,6 +127,17 @@ public class ClientTelemetryTest {
         }
     }
 
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void testIntervalMsParser(ClusterInstance clusterInstance) {
+        List<String> alterOpts = asList("--bootstrap-server", 
clusterInstance.bootstrapServers(),
+                "--alter", "--entity-type", "client-metrics", "--entity-name", 
"test", "--add-config", "interval.ms=bbb");
+        try (Admin client = clusterInstance.createAdminClient()) {
+            ConfigCommand.ConfigCommandOptions addOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(alterOpts));
+
+            Throwable e = assertThrows(ExecutionException.class, () -> 
ConfigCommand.alterConfig(client, addOpts));
+            
assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
+        }
+    }
 
     @ClusterTest(types = Type.KRAFT)
     public void testMetrics(ClusterInstance clusterInstance) {
@@ -141,6 +156,10 @@ public class ClientTelemetryTest {
         }
     }
 
+    private static String[] toArray(List<String>... lists) {
+        return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
+    }
+
     /**
      * We should add a ClientTelemetry into plugins to test the 
clientInstanceId method Otherwise the
      * {@link  
org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command 
will not be supported
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index 3f19291fc83..830bd29a243 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -123,6 +123,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
         validateProperties(properties);
     }
 
+    @SuppressWarnings("unchecked")
     private static void validateProperties(Properties properties) {
         // Make sure that all the properties are valid
         properties.forEach((key, value) -> {
@@ -131,9 +132,11 @@ public class ClientMetricsConfigs extends AbstractConfig {
             }
         });
 
+        Map<String, Object> parsed = CONFIG.parse(properties);
+
         // Make sure that push interval is between 100ms and 1 hour.
         if (properties.containsKey(PUSH_INTERVAL_MS)) {
-            int pushIntervalMs = 
Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
+            Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
             if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > 
MAX_INTERVAL_MS) {
                 String msg = String.format("Invalid value %s for %s, interval 
must be between 100 and 3600000 (1 hour)",
                     pushIntervalMs, PUSH_INTERVAL_MS);
@@ -143,7 +146,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
 
         // Make sure that client match patterns are valid by parsing them.
         if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
-            List<String> patterns = 
Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(","));
+            List<String> patterns = (List<String>) 
parsed.get(CLIENT_MATCH_PATTERN);
             // Parse the client matching patterns to validate if the patterns 
are valid.
             parseMatchingPatterns(patterns);
         }

Reply via email to