This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8bc4c33  KAFKA-13204: assignor name conflict check (#11217)
8bc4c33 is described below

commit 8bc4c334e36ac6fafe6e1d4a468da713336447e3
Author: Luke Chen <[email protected]>
AuthorDate: Thu Aug 19 08:23:29 2021 +0800

    KAFKA-13204: assignor name conflict check (#11217)
    
    Add the partition assignor name conflicting check to avoid the wrong 
assignor being used.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../consumer/ConsumerPartitionAssignor.java        |  9 +++++
 .../consumer/ConsumerPartitionAssignorTest.java    | 45 ++++++++++++++++++++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 11 ++++++
 3 files changed, 65 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 0704e33..a541b8a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Optional;
 import java.util.Collections;
 import java.util.List;
@@ -264,6 +265,8 @@ public interface ConsumerPartitionAssignor {
      */
     static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> 
assignorClasses, Map<String, Object> configs) {
         List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
+        // a map to store assignor name -> assignor class name
+        Map<String, String> assignorNameMap = new HashMap<>();
 
         if (assignorClasses == null)
             return assignors;
@@ -284,6 +287,12 @@ public interface ConsumerPartitionAssignor {
                     ((Configurable) assignor).configure(configs);
 
                 if (assignor instanceof ConsumerPartitionAssignor) {
+                    String assignorName = ((ConsumerPartitionAssignor) 
assignor).name();
+                    if (assignorNameMap.containsKey(assignorName)) {
+                        throw new KafkaException("The assignor name: '" + 
assignorName + "' is used in more than one assignor: " +
+                            assignorNameMap.get(assignorName) + ", " + 
assignor.getClass().getName());
+                    }
+                    assignorNameMap.put(assignorName, 
assignor.getClass().getName());
                     assignors.add((ConsumerPartitionAssignor) assignor);
                 } else {
                     throw new KafkaException(klass + " is not an instance of " 
+ ConsumerPartitionAssignor.class.getName());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
index 0307000..1298f8c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
@@ -17,14 +17,17 @@
 package org.apache.kafka.clients.consumer;
 
 
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.jupiter.api.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
@@ -99,6 +102,48 @@ public class ConsumerPartitionAssignorTest {
         assertThrows(KafkaException.class, () -> 
getAssignorInstances(classTypes, Collections.emptyMap()));
     }
 
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorsWithSameName() {
+        assertThrows(KafkaException.class, () -> getAssignorInstances(
+            Arrays.asList(RangeAssignor.class.getName(), 
TestConsumerPartitionAssignor.class.getName()),
+            Collections.emptyMap()
+        ));
+    }
+
+    public static class TestConsumerPartitionAssignor implements 
ConsumerPartitionAssignor {
+
+        @Override
+        public ByteBuffer subscriptionUserData(Set<String> topics) {
+            return 
ConsumerPartitionAssignor.super.subscriptionUserData(topics);
+        }
+
+        @Override
+        public GroupAssignment assign(Cluster metadata, GroupSubscription 
groupSubscription) {
+            return null;
+        }
+
+        @Override
+        public void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
+            ConsumerPartitionAssignor.super.onAssignment(assignment, metadata);
+        }
+
+        @Override
+        public List<RebalanceProtocol> supportedProtocols() {
+            return ConsumerPartitionAssignor.super.supportedProtocols();
+        }
+
+        @Override
+        public short version() {
+            return ConsumerPartitionAssignor.super.version();
+        }
+
+        @Override
+        public String name() {
+            // use the RangeAssignor's name to cause naming conflict
+            return new RangeAssignor().name();
+        }
+    }
+
     private ConsumerConfig initConsumerConfigWithClassTypes(List<Object> 
classTypes) {
         Properties props = new Properties();
         props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index d717036..57cd942 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2837,6 +2837,17 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test
+    public void testAssignorNameConflict() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+            Arrays.asList(RangeAssignor.class.getName(), 
ConsumerPartitionAssignorTest.TestConsumerPartitionAssignor.class.getName()));
+
+        assertThrows(KafkaException.class,
+            () -> new KafkaConsumer<>(configs, new StringDeserializer(), new 
StringDeserializer()));
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements 
Deserializer<byte[]> {
         @Override

Reply via email to