lindong28 commented on a change in pull request #18397:
URL: https://github.com/apache/flink/pull/18397#discussion_r789248928
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -65,8 +77,15 @@ public void open(InitializationContext context) throws
Exception {
serializerClass.getName(),
Serializer.class,
getClass().getClassLoader());
+
+ if (config.isEmpty()) {
Review comment:
Is it possible that user's deserializer::configure(...) could still be
useful (e.g. has side effects) even when the config is empty?
Would it be simpler and safer to remove this check?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -33,25 +33,37 @@
import static org.apache.flink.util.Preconditions.checkState;
class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
+
private final Class<? extends Serializer<? super IN>> serializerClass;
+ // whether the serializer is for key or value
+ private final boolean isKey;
private final Map<String, String> config;
private final Function<? super IN, String> topicSelector;
private transient Serializer<? super IN> serializer;
KafkaSerializerWrapper(
Class<? extends Serializer<? super IN>> serializerClass,
+ boolean isKey,
Map<String, String> config,
Function<? super IN, String> topicSelector) {
this.serializerClass = checkNotNull(serializerClass);
+ this.isKey = isKey;
this.config = checkNotNull(config);
this.topicSelector = checkNotNull(topicSelector);
}
+ KafkaSerializerWrapper(
+ Class<? extends Serializer<? super IN>> serializerClass,
+ Map<String, String> config,
+ Function<? super IN, String> topicSelector) {
+ this(serializerClass, false, config, topicSelector);
+ }
+
KafkaSerializerWrapper(
Class<? extends Serializer<? super IN>> serializerClass,
Function<? super IN, String> topicSelector) {
- this(serializerClass, Collections.emptyMap(), topicSelector);
+ this(serializerClass, false, Collections.emptyMap(), topicSelector);
Review comment:
This constructor is invoked by
`KafkaRecordSerializationSchemaBuilder::setKafkaKeySerializer(...)`. It is
probably not intuitive to set `isKey=false` here.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
##########
@@ -123,9 +121,8 @@ default void
open(DeserializationSchema.InitializationContext context) throws Ex
* @return A {@link KafkaRecordDeserializationSchema} that deserialize the
value with the given
* deserializer.
*/
- static <V, D extends Configurable & Deserializer<V>>
- KafkaRecordDeserializationSchema<V> valueOnly(
- Class<D> valueDeserializerClass, Map<String, String>
config) {
+ static <V, D extends Deserializer<V>> KafkaRecordDeserializationSchema<V>
valueOnly(
Review comment:
@fapaul I think the best way is to use `Deserializer::configure(...)` to
configure deserializer. And I believe all the deserializers provided
off-the-shelf by Apache Kafka, and most user-implemented deserializer, use
`Deserializer::configure(...)` instead of `Configurable`.
`Configurable::configure(...)` is not deprecated in Kafka and I believe
Flink still supports this for backward compatibility. And I agree it is useful
to extend the functionality of this method to support
`Deserializer::configure(...)` as long as we could make sure we don't break
users existing application.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
##########
@@ -123,9 +121,8 @@ default void
open(DeserializationSchema.InitializationContext context) throws Ex
* @return A {@link KafkaRecordDeserializationSchema} that deserialize the
value with the given
* deserializer.
*/
- static <V, D extends Configurable & Deserializer<V>>
- KafkaRecordDeserializationSchema<V> valueOnly(
- Class<D> valueDeserializerClass, Map<String, String>
config) {
+ static <V, D extends Deserializer<V>> KafkaRecordDeserializationSchema<V>
valueOnly(
Review comment:
@JingGe I see. I agree it is still backward compatible (suppose we
remove the `config.isEmpty()` check) if user invokes this method with a
`Configurable` instance.
It seems that the Java doc of this method still says `only valid when the
deserializer is an implementation of {@code Configurable}`. Do we need to
update this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]