This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b8090ad KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102) b8090ad is described below commit b8090add335903e6daf38ea99d0f94be5a5e6ed4 Author: Chia-Ping Tsai <chia7...@gmail.com> AuthorDate: Thu Oct 1 11:23:07 2020 +0800 KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102) Reviewers: Boyang Chen <boy...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../kafka/clients/consumer/KafkaConsumer.java | 4 ++-- .../kafka/clients/producer/KafkaProducer.java | 4 ++-- .../apache/kafka/common/config/AbstractConfig.java | 7 ++++++ .../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++++++++++++++++++++++ .../kafka/clients/producer/KafkaProducerTest.java | 27 ++++++++++++++++++++++ .../kafka/common/config/AbstractConfigTest.java | 9 ++++++++ 6 files changed, 74 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 91070f9..ac996cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -704,14 +704,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.interceptors = new ConsumerInterceptors<>(interceptorList); if (keyDeserializer == null) { this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); - this.keyDeserializer.configure(config.originals(), true); + this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true); } else { config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); this.keyDeserializer = keyDeserializer; } if (valueDeserializer == null) { this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); - this.valueDeserializer.configure(config.originals(), false); + this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false); } else { config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); this.valueDeserializer = valueDeserializer; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 12ecc5c..ebd1746 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); - this.keySerializer.configure(config.originals(), true); + this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = keySerializer; @@ -371,7 +371,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); - this.valueSerializer.configure(config.originals(), false); + this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = valueSerializer; diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 968c549..a747e50 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -228,6 +228,13 @@ public class AbstractConfig { return copy; } + public Map<String, Object> originals(Map<String, Object> configOverrides) { + Map<String, Object> copy = new RecordingMap<>(); + copy.putAll(originals); + copy.putAll(configOverrides); + return copy; + } + /** * Get all the original settings, ensuring that all values are of type String. * @return the original settings diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 401dde2..1248845 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2535,4 +2535,31 @@ public class KafkaConsumerTest { assertEquals(countingRebalanceListener.revokedCount, 1); } + + @Test + public void deserializerShouldSeeGeneratedClientId() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName()); + + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); + assertEquals(2, DeserializerForClientId.CLIENT_IDS.size()); + assertEquals(DeserializerForClientId.CLIENT_IDS.get(0), consumer.getClientId()); + assertEquals(DeserializerForClientId.CLIENT_IDS.get(1), consumer.getClientId()); + consumer.close(); + } + + public static class DeserializerForClientId implements Deserializer<byte[]> { + static final List<String> CLIENT_IDS = new ArrayList<>(); + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString()); + } + + @Override + public byte[] deserialize(String topic, byte[] data) { + return data; + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 68667b9..0c809f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1276,4 +1276,31 @@ public class KafkaProducerTest { new LogContext(), new ClusterResourceListeners(), Time.SYSTEM); } + @Test + public void serializerShouldSeeGeneratedClientId() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName()); + + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props); + assertEquals(2, SerializerForClientId.CLIENT_IDS.size()); + assertEquals(SerializerForClientId.CLIENT_IDS.get(0), producer.getClientId()); + assertEquals(SerializerForClientId.CLIENT_IDS.get(1), producer.getClientId()); + producer.close(); + } + + public static class SerializerForClientId implements Serializer<byte[]> { + static final List<String> CLIENT_IDS = new ArrayList<>(); + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + CLIENT_IDS.add(configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString()); + } + + @Override + public byte[] serialize(String topic, byte[] data) { + return data; + } + } + } diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 73f83d7..04085ae 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -335,6 +335,15 @@ public class AbstractConfigTest { } @Test + public void testOriginalWithOverrides() { + Properties props = new Properties(); + props.put("config.providers", "file"); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); + assertEquals(config.originals().get("config.providers"), "file"); + assertEquals(config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers"), "file2"); + } + + @Test public void testOriginalsWithConfigProvidersProps() { Properties props = new Properties();