alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r255309723
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##########
 @@ -55,4 +57,13 @@
         * @return null or the target topic
         */
        String getTargetTopic(T element);
+
+       /**
+        *
+        * @param element The incoming element to be serialized
+        * @return collection of headers (maybe empty)
+        */
+       default Iterable<Map.Entry<String, byte[]>> headers(T element) {
 
 Review comment:
   So we still need two KafkaSerializationSchema adapters: one for base, 0.8 
and 0.9 which created ProducerRecord w/o timestamp and one with timestamp for 
rest of versions?
   
   Regarding `open`, I guess we can do it, but if someone had non-trivial 
overridden `open` and want to migrate to new `KafkaSerializationSchema` it will 
be more challenging, because he will need to move logic of `open`, which was 
called once into `serialize`, which is called for each record. Likely rare 
case, and it is feasible to cache results of doing "open" during `serialize` in 
`KafkaSerializationSchema`. So let's consider it acceptable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to