Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    @FredTing we had some offline discussion on how to proceed with this.
    @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I 
missed anything.
    
    The conflict that Stephan mentioned between a "common deserialization 
schema" interface and exposing surfacing connector specific information is 
rooted in the fact that both concerns (deserialization and providing connector 
specific record meta information) is currently coupled in a single interface.
    
    Take for example the Kafka connector's `KeyedDeserializationSchema` - there 
we try to deserialize the Kafka bytes, as well as provide information such as 
topic / partition / timestamp etc. to allow the user to enrich their user 
records for downstream business logic. The first part (deserialization of 
bytes) should be something common for all connector sources, while the second 
part is Kafka-specific.
    
    Therefore, we should perhaps break this up into two separate interfaces, as 
follows:
    ```
    // common interface for all sources (we already have this)
    interface DeserializationSchema<T> {
        T deserialize(byte[] bytes);
    }
    
    // ... and a Kafka-specific interface that is only used to provide record 
meta information
    interface ConsumerRecordMetaInfoProvider<T> {
        T enrich(T record, ConsumerRecordMetaInfo metaInfo);
    }
    ```
    
    The second interface is something that each connector should have 
independently, and does not handle deserialization of the record bytes. The 
name, of course, is still open to discussion.
    
    What do you think?


---

Reply via email to