This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8bc4c33 KAFKA-13204: assignor name conflict check (#11217)
8bc4c33 is described below
commit 8bc4c334e36ac6fafe6e1d4a468da713336447e3
Author: Luke Chen <[email protected]>
AuthorDate: Thu Aug 19 08:23:29 2021 +0800
KAFKA-13204: assignor name conflict check (#11217)
Add the partition assignor name conflicting check to avoid the wrong
assignor being used.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../consumer/ConsumerPartitionAssignor.java | 9 +++++
.../consumer/ConsumerPartitionAssignorTest.java | 45 ++++++++++++++++++++++
.../kafka/clients/consumer/KafkaConsumerTest.java | 11 ++++++
3 files changed, 65 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 0704e33..a541b8a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Optional;
import java.util.Collections;
import java.util.List;
@@ -264,6 +265,8 @@ public interface ConsumerPartitionAssignor {
*/
static List<ConsumerPartitionAssignor> getAssignorInstances(List<String>
assignorClasses, Map<String, Object> configs) {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
+ // a map to store assignor name -> assignor class name
+ Map<String, String> assignorNameMap = new HashMap<>();
if (assignorClasses == null)
return assignors;
@@ -284,6 +287,12 @@ public interface ConsumerPartitionAssignor {
((Configurable) assignor).configure(configs);
if (assignor instanceof ConsumerPartitionAssignor) {
+ String assignorName = ((ConsumerPartitionAssignor)
assignor).name();
+ if (assignorNameMap.containsKey(assignorName)) {
+ throw new KafkaException("The assignor name: '" +
assignorName + "' is used in more than one assignor: " +
+ assignorNameMap.get(assignorName) + ", " +
assignor.getClass().getName());
+ }
+ assignorNameMap.put(assignorName,
assignor.getClass().getName());
assignors.add((ConsumerPartitionAssignor) assignor);
} else {
throw new KafkaException(klass + " is not an instance of "
+ ConsumerPartitionAssignor.class.getName());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
index 0307000..1298f8c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
@@ -17,14 +17,17 @@
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
@@ -99,6 +102,48 @@ public class ConsumerPartitionAssignorTest {
assertThrows(KafkaException.class, () ->
getAssignorInstances(classTypes, Collections.emptyMap()));
}
+ @Test
+ public void shouldThrowKafkaExceptionOnAssignorsWithSameName() {
+ assertThrows(KafkaException.class, () -> getAssignorInstances(
+ Arrays.asList(RangeAssignor.class.getName(),
TestConsumerPartitionAssignor.class.getName()),
+ Collections.emptyMap()
+ ));
+ }
+
+ public static class TestConsumerPartitionAssignor implements
ConsumerPartitionAssignor {
+
+ @Override
+ public ByteBuffer subscriptionUserData(Set<String> topics) {
+ return
ConsumerPartitionAssignor.super.subscriptionUserData(topics);
+ }
+
+ @Override
+ public GroupAssignment assign(Cluster metadata, GroupSubscription
groupSubscription) {
+ return null;
+ }
+
+ @Override
+ public void onAssignment(Assignment assignment, ConsumerGroupMetadata
metadata) {
+ ConsumerPartitionAssignor.super.onAssignment(assignment, metadata);
+ }
+
+ @Override
+ public List<RebalanceProtocol> supportedProtocols() {
+ return ConsumerPartitionAssignor.super.supportedProtocols();
+ }
+
+ @Override
+ public short version() {
+ return ConsumerPartitionAssignor.super.version();
+ }
+
+ @Override
+ public String name() {
+ // use the RangeAssignor's name to cause naming conflict
+ return new RangeAssignor().name();
+ }
+ }
+
private ConsumerConfig initConsumerConfigWithClassTypes(List<Object>
classTypes) {
Properties props = new Properties();
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index d717036..57cd942 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2837,6 +2837,17 @@ public class KafkaConsumerTest {
}
}
+ @Test
+ public void testAssignorNameConflict() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+ Arrays.asList(RangeAssignor.class.getName(),
ConsumerPartitionAssignorTest.TestConsumerPartitionAssignor.class.getName()));
+
+ assertThrows(KafkaException.class,
+ () -> new KafkaConsumer<>(configs, new StringDeserializer(), new
StringDeserializer()));
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements
Deserializer<byte[]> {
@Override