lindong28 commented on a change in pull request #18397:
URL: https://github.com/apache/flink/pull/18397#discussion_r790076382
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -32,26 +32,46 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
+/** Wrapper for Kafka {@link Serializer}. */
class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
+
private final Class<? extends Serializer<? super IN>> serializerClass;
+ // whether the serializer is for key or value
+ private final boolean isKey;
private final Map<String, String> config;
private final Function<? super IN, String> topicSelector;
private transient Serializer<? super IN> serializer;
KafkaSerializerWrapper(
Class<? extends Serializer<? super IN>> serializerClass,
+ boolean isKey,
Map<String, String> config,
Function<? super IN, String> topicSelector) {
this.serializerClass = checkNotNull(serializerClass);
+ this.isKey = isKey;
this.config = checkNotNull(config);
this.topicSelector = checkNotNull(topicSelector);
}
KafkaSerializerWrapper(
Class<? extends Serializer<? super IN>> serializerClass,
+ Map<String, String> config,
+ Function<? super IN, String> topicSelector) {
+ this(serializerClass, false, config, topicSelector);
+ }
+
+ KafkaSerializerWrapper(
Review comment:
nits: Previously we have two constructors here. Now we have 4
constructors due to the addition of `isKey` parameter.
Would it be simpler to keep only two constructors and let caller always
explicitly provides the `isKey` value? This probably also makes the
`KafkaRecordSerializationSchemaBuilder` code more readable.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java
##########
@@ -36,10 +36,15 @@
/** A package private class to wrap {@link Deserializer}. */
class KafkaValueOnlyDeserializerWrapper<T> implements
KafkaRecordDeserializationSchema<T> {
+
private static final long serialVersionUID = 5409547407386004054L;
+
private static final Logger LOG =
LoggerFactory.getLogger(KafkaValueOnlyDeserializerWrapper.class);
+
private final Class<? extends Deserializer<T>> deserializerClass;
+ // always be false since this Deserializer is only used for value.
Review comment:
nits: The comment seems to be outdated since this value could be true.
And could we start the comment with an upper-case letter for consistency
with other comments?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
##########
@@ -53,11 +54,15 @@
private static final String DEFAULT_TOPIC = "test";
+ private static Map<String, ?> configurableConfiguration;
Review comment:
hmm... would it be simpler to let `ConfigurableStringSerializer` use the
`configuration` below instead of creating this `configurableConfiguration`?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
##########
@@ -108,24 +107,24 @@ default void
open(DeserializationSchema.InitializationContext context) throws Ex
*/
static <V> KafkaRecordDeserializationSchema<V> valueOnly(
Class<? extends Deserializer<V>> valueDeserializerClass) {
- return new KafkaValueOnlyDeserializerWrapper<>(
- valueDeserializerClass, Collections.emptyMap());
+ return valueOnly(valueDeserializerClass, Collections.emptyMap());
}
/**
* Wraps a Kafka {@link Deserializer} to a {@link
KafkaRecordDeserializationSchema}.
*
* @param valueDeserializerClass the deserializer class used to
deserialize the value.
- * @param config the configuration of the value deserializer, only valid
when the deserializer
- * is an implementation of {@code Configurable}.
+ * @param config the configuration of the value deserializer. If the
deserializer is an
Review comment:
nits: the doc seems to miss the explanation of what is invoked when the
deserializer is NOT an implementation of Configurable.
Would it be slightly better to use something like this:
```
The configuration of the value deserializer. If the deserializer is an
implementation of {@code Configurable},
{@link org.apache.kafka.common.Configurable#configure(Map)}
will be invoked with the given config. Otherwise, {@link
Deserializer#configure(Map, boolean)}
will be invoked with the given config.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]