lindong28 commented on a change in pull request #18397:
URL: https://github.com/apache/flink/pull/18397#discussion_r789248928



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -65,8 +77,15 @@ public void open(InitializationContext context) throws 
Exception {
                             serializerClass.getName(),
                             Serializer.class,
                             getClass().getClassLoader());
+
+            if (config.isEmpty()) {

Review comment:
       Is it possible that user's deserializer::configure(...) could still be 
useful (e.g. has side effects) even when the config is empty?
   
   Would it be simpler and safer to remove this check?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -33,25 +33,37 @@
 import static org.apache.flink.util.Preconditions.checkState;
 
 class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
+
     private final Class<? extends Serializer<? super IN>> serializerClass;
+    // whether the serializer is for key or value
+    private final boolean isKey;
     private final Map<String, String> config;
     private final Function<? super IN, String> topicSelector;
 
     private transient Serializer<? super IN> serializer;
 
     KafkaSerializerWrapper(
             Class<? extends Serializer<? super IN>> serializerClass,
+            boolean isKey,
             Map<String, String> config,
             Function<? super IN, String> topicSelector) {
         this.serializerClass = checkNotNull(serializerClass);
+        this.isKey = isKey;
         this.config = checkNotNull(config);
         this.topicSelector = checkNotNull(topicSelector);
     }
 
+    KafkaSerializerWrapper(
+            Class<? extends Serializer<? super IN>> serializerClass,
+            Map<String, String> config,
+            Function<? super IN, String> topicSelector) {
+        this(serializerClass, false, config, topicSelector);
+    }
+
     KafkaSerializerWrapper(
             Class<? extends Serializer<? super IN>> serializerClass,
             Function<? super IN, String> topicSelector) {
-        this(serializerClass, Collections.emptyMap(), topicSelector);
+        this(serializerClass, false, Collections.emptyMap(), topicSelector);

Review comment:
       This constructor is invoked by 
`KafkaRecordSerializationSchemaBuilder::setKafkaKeySerializer(...)`. It is 
probably not intuitive to set `isKey=false` here.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
##########
@@ -123,9 +121,8 @@ default void 
open(DeserializationSchema.InitializationContext context) throws Ex
      * @return A {@link KafkaRecordDeserializationSchema} that deserialize the 
value with the given
      *     deserializer.
      */
-    static <V, D extends Configurable & Deserializer<V>>
-            KafkaRecordDeserializationSchema<V> valueOnly(
-                    Class<D> valueDeserializerClass, Map<String, String> 
config) {
+    static <V, D extends Deserializer<V>> KafkaRecordDeserializationSchema<V> 
valueOnly(

Review comment:
       @fapaul I think the best way is to use `Deserializer::configure(...)` to 
configure deserializer. And I believe all the deserializers provided 
off-the-shelf by Apache Kafka, and most user-implemented deserializer, use 
`Deserializer::configure(...)` instead of `Configurable`.
   
   `Configurable::configure(...)` is not deprecated in Kafka and I believe 
Flink still supports this for backward compatibility. And I agree it is useful 
to extend the functionality of this method to support 
`Deserializer::configure(...)` as long as we could make sure we don't break 
users existing application.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
##########
@@ -123,9 +121,8 @@ default void 
open(DeserializationSchema.InitializationContext context) throws Ex
      * @return A {@link KafkaRecordDeserializationSchema} that deserialize the 
value with the given
      *     deserializer.
      */
-    static <V, D extends Configurable & Deserializer<V>>
-            KafkaRecordDeserializationSchema<V> valueOnly(
-                    Class<D> valueDeserializerClass, Map<String, String> 
config) {
+    static <V, D extends Deserializer<V>> KafkaRecordDeserializationSchema<V> 
valueOnly(

Review comment:
       @JingGe I see. I agree it is still backward compatible (suppose we 
remove the `config.isEmpty()` check) if user invokes this method with a 
`Configurable` instance.
   
   It seems that the Java doc of this method still says `only valid when the 
deserializer is an implementation of {@code Configurable}`. Do we need to 
update this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to