Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186967673
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
     
    +   /**
    +    * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +    */
    +   @Deprecated
    +   T deserialize(byte[] message) throws IOException;
    +
        /**
         * Deserializes the byte message.
         *
    -    * @param message The message, as a byte array.
    +    * @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
         *
         * @return The deserialized message as an object (null if the message 
cannot be deserialized).
         */
    -   T deserialize(byte[] message) throws IOException;
    +   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
    --- End diff --
    
    I would also vote for deprecating those classes and creating a specific 
version `KafkaDeserializationSchema`/`KafkaSerializationSchema`. I would also 
like to add a corresponding option to `SerializationSchema` to pass the 
targetTopic down, e.g. to be able to lookup appropriate schema in 
SchemaRegistry.
    
    I think changes like those does not fit well into a common space.


---

Reply via email to