JTaky commented on a change in pull request #11145:
[FLINK-15904][connectors/kafka] Make Kafka Consumer work with activated
"disableGenericTypes()"
URL: https://github.com/apache/flink/pull/11145#discussion_r382008879
##########
File path:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
##########
@@ -675,39 +672,35 @@ private void
setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Excep
1);
}
+ /**
+ * Before using an explicit TypeSerializer for the partition state the
{@link
+ * FlinkKafkaConsumerBase} was creating a serializer using a {@link
TypeHint}. Here, we verify
+ * that the two methods create compatible serializers.
+ */
@Test
- public void testStateSerializerVsTypeInfo() throws Exception {
+ public void testExplicitStateSerializerCompatibility() throws Exception
{
ExecutionConfig executionConfig = new ExecutionConfig();
- TypeInformation<Tuple2<KafkaTopicPartition, Long>>
typeInformation = new TypeHint<Tuple2<KafkaTopicPartition, Long>>()
{}.getTypeInfo();
+ Tuple2<KafkaTopicPartition, Long> tuple =
+ new Tuple2<>(new KafkaTopicPartition("dummy",
0), 42L);
- Tuple2<KafkaTopicPartition, Long> tuple = new Tuple2<>(new
KafkaTopicPartition("dummy", 0), 42L);
+ // This is how the KafkaConsumerBase used to create the
TypeSerializer
+ TypeInformation<Tuple2<KafkaTopicPartition, Long>>
originalTypeHintTypeInfo =
+ new TypeHint<Tuple2<KafkaTopicPartition,
Long>>() {}.getTypeInfo();
+ TypeSerializer<Tuple2<KafkaTopicPartition, Long>>
serializerFromTypeHint =
+
originalTypeHintTypeInfo.createSerializer(executionConfig);
+ byte[] bytes =
InstantiationUtil.serializeToByteArray(serializerFromTypeHint, tuple);
- Tuple2<KafkaTopicPartition, Long> expectedTuple;
- {
- ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(32);
- DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(outputStream);
- TypeSerializer<Tuple2<KafkaTopicPartition, Long>>
typeInfoSerializer = typeInformation.createSerializer(executionConfig);
- typeInfoSerializer.serialize(tuple, outputView);
- ByteArrayInputStream inputStream = new
ByteArrayInputStream(outputStream.toByteArray());
- DataInputView inputView = new
DataInputViewStreamWrapper(inputStream);
- expectedTuple =
typeInfoSerializer.deserialize(inputView);
- }
-
- Tuple2<KafkaTopicPartition, Long> actualTuple;
- {
- ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(32);
- DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(outputStream);
-
typeInformation.createSerializer(executionConfig).serialize(tuple, outputView);
- ByteArrayInputStream inputStream = new
ByteArrayInputStream(outputStream.toByteArray());
- DataInputView inputView = new
DataInputViewStreamWrapper(inputStream);
- actualTuple =
FlinkKafkaConsumerBase.createStateSerializer(executionConfig).deserialize(inputView);
- }
+ // Directly use the Consumer to create the TypeSerializer
(using the new method)
+ TupleSerializer<Tuple2<KafkaTopicPartition, Long>>
kafkaConsumerSerializer =
+
FlinkKafkaConsumerBase.createStateSerializer(executionConfig);
+ Tuple2<KafkaTopicPartition, Long> actualTuple =
+
InstantiationUtil.deserializeFromByteArray(kafkaConsumerSerializer, bytes);
Assert.assertEquals(
- "Explicit KryoSerializer for
Tuple2<KafkaTopicPartition, Long> in not compatible with TypeHint based
serializer",
- expectedTuple,
- actualTuple
+ "New Serializer is not compatible with previous
method of creating Serializer using TypeHint.",
Review comment:
imho new it a very relative term and gives no information once this test
fails.
What about: "Explicit Serializer is not compatible with previous method of
creating Serializer using TypeHint"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services