aljoscha commented on a change in pull request #11145: 
[FLINK-15904][connectors/kafka] Make Kafka Consumer work with activated 
"disableGenericTypes()"
URL: https://github.com/apache/flink/pull/11145#discussion_r382128796
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ##########
 @@ -675,39 +672,35 @@ private void 
setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Excep
                        1);
        }
 
+       /**
+        * Before using an explicit TypeSerializer for the partition state the 
{@link
+        * FlinkKafkaConsumerBase} was creating a serializer using a {@link 
TypeHint}. Here, we verify
+        * that the two methods create compatible serializers.
+        */
        @Test
-       public void testStateSerializerVsTypeInfo() throws Exception {
+       public void testExplicitStateSerializerCompatibility() throws Exception 
{
                ExecutionConfig executionConfig = new ExecutionConfig();
 
-               TypeInformation<Tuple2<KafkaTopicPartition, Long>> 
typeInformation = new TypeHint<Tuple2<KafkaTopicPartition, Long>>() 
{}.getTypeInfo();
+               Tuple2<KafkaTopicPartition, Long> tuple =
+                               new Tuple2<>(new KafkaTopicPartition("dummy", 
0), 42L);
 
-               Tuple2<KafkaTopicPartition, Long> tuple = new Tuple2<>(new 
KafkaTopicPartition("dummy", 0), 42L);
+               // This is how the KafkaConsumerBase used to create the 
TypeSerializer
+               TypeInformation<Tuple2<KafkaTopicPartition, Long>> 
originalTypeHintTypeInfo =
+                               new TypeHint<Tuple2<KafkaTopicPartition, 
Long>>() {}.getTypeInfo();
+               TypeSerializer<Tuple2<KafkaTopicPartition, Long>> 
serializerFromTypeHint =
+                               
originalTypeHintTypeInfo.createSerializer(executionConfig);
+               byte[] bytes = 
InstantiationUtil.serializeToByteArray(serializerFromTypeHint, tuple);
 
-               Tuple2<KafkaTopicPartition, Long> expectedTuple;
-               {
-                       ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(32);
-                       DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
-                       TypeSerializer<Tuple2<KafkaTopicPartition, Long>> 
typeInfoSerializer = typeInformation.createSerializer(executionConfig);
-                       typeInfoSerializer.serialize(tuple, outputView);
-                       ByteArrayInputStream inputStream = new 
ByteArrayInputStream(outputStream.toByteArray());
-                       DataInputView inputView = new 
DataInputViewStreamWrapper(inputStream);
-                       expectedTuple = 
typeInfoSerializer.deserialize(inputView);
-               }
-
-               Tuple2<KafkaTopicPartition, Long> actualTuple;
-               {
-                       ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(32);
-                       DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
-                       
typeInformation.createSerializer(executionConfig).serialize(tuple, outputView);
-                       ByteArrayInputStream inputStream = new 
ByteArrayInputStream(outputStream.toByteArray());
-                       DataInputView inputView = new 
DataInputViewStreamWrapper(inputStream);
-                       actualTuple = 
FlinkKafkaConsumerBase.createStateSerializer(executionConfig).deserialize(inputView);
-               }
+               // Directly use the Consumer to create the TypeSerializer 
(using the new method)
+               TupleSerializer<Tuple2<KafkaTopicPartition, Long>> 
kafkaConsumerSerializer =
+                               
FlinkKafkaConsumerBase.createStateSerializer(executionConfig);
+               Tuple2<KafkaTopicPartition, Long> actualTuple =
+                               
InstantiationUtil.deserializeFromByteArray(kafkaConsumerSerializer, bytes);
 
                Assert.assertEquals(
-                       "Explicit KryoSerializer for 
Tuple2<KafkaTopicPartition, Long> in not compatible with TypeHint based 
serializer",
-                       expectedTuple,
-                       actualTuple
+                               "New Serializer is not compatible with previous 
method of creating Serializer using TypeHint.",
 
 Review comment:
   Yes, will change that. 👌 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to