lindong28 commented on a change in pull request #18397:
URL: https://github.com/apache/flink/pull/18397#discussion_r790786440
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
##########
@@ -93,6 +112,29 @@ public void testKafkaValueDeserializerWrapper() throws
Exception {
assertEquals("world", collector.list.get(0));
}
+ @Test
+ public void testNoConfigurableKafkaSerializer() throws Exception {
+ final Map<String, String> config = ImmutableMap.of("simpleKey",
"simpleValue");
+ KafkaRecordDeserializationSchema<String> schema =
+
KafkaRecordDeserializationSchema.valueOnly(SimpleStringSerializer.class,
config);
+ schema.open(new TestingDeserializationContext());
+ Assertions.assertEquals(configuration, config);
+ assertFalse(isKeyDeserializer);
+ assertTrue(configurableConfiguration.isEmpty());
+ }
+
+ @Test
+ public void testConfigurableKafkaValueSerializer() throws Exception {
Review comment:
nits: The test name should probably mention `deserializer` instead of
`serializer`.
How about naming it `testKafkaValueDeserializerWrapperWithConfigurable` so
that its name is more consistent with the existing
`testKafkaValueDeserializerWrapper`?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -32,26 +32,33 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
+/** Wrapper for Kafka {@link Serializer}. */
class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
+
private final Class<? extends Serializer<? super IN>> serializerClass;
+ // whether the serializer is for key or value
Review comment:
nits: could we change `whether` to `Whether`?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
##########
@@ -93,6 +112,29 @@ public void testKafkaValueDeserializerWrapper() throws
Exception {
assertEquals("world", collector.list.get(0));
}
+ @Test
+ public void testNoConfigurableKafkaSerializer() throws Exception {
Review comment:
nits: The test name should probably mention `deserializer` instead of
`serializer`.
How about naming it `testKafkaValueDeserializerWrapperWithoutConfigurable`
so that its name is more consistent with the existing
`testKafkaValueDeserializerWrapper`?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
##########
@@ -160,6 +165,37 @@ public void testSerializeRecordWithKey() {
assertArrayEquals(record.value(), serializationSchema.serialize("a"));
}
+ @Test
+ public void testNoConfigurableKafkaKeySerializer() throws Exception {
+ final Map<String, String> config = ImmutableMap.of("simpleKey",
"simpleValue");
+ final KafkaRecordSerializationSchema<String> schema =
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(DEFAULT_TOPIC)
+ // use StringSerializer as dummy Serializer, since
ValueSerializer is
Review comment:
nits: `use` -> `Use`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]