[
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480505#comment-16480505
]
ASF GitHub Bot commented on FLINK-8500:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/5958
@FredTing we had some offline discussion on how to proceed with this.
@aljoscha, @twalthr, or @StephanEwen can probably comment more here if I
missed anything.
The conflict that Stephan mentioned between a "common deserialization
schema" interface and exposing surfacing connector specific information is
rooted in the fact that both concerns (deserialization and providing connector
specific record meta information) is currently coupled in a single interface.
Take for example the Kafka connector's `KeyedDeserializationSchema` - there
we try to deserialize the Kafka bytes, as well as provide information such as
topic / partition / timestamp etc. to allow the user to enrich their user
records for downstream business logic. The first part (deserialization of
bytes) should be something common for all connector sources, while the second
part is Kafka-specific.
Therefore, we should perhaps break this up into two separate interfaces, as
follows:
```
// common interface for all sources (we already have this)
interface DeserializationSchema<T> {
T deserialize(byte[] bytes);
}
// ... and a Kafka-specific interface that is only used to provide record
meta information
interface ConsumerRecordMetaInfoProvider<T> {
T enrich(T record, ConsumerRecordMetaInfo metaInfo);
}
```
The second interface is something that each connector should have
independently, and does not handle deserialization of the record bytes. The
name, of course, is still open to discussion.
What do you think?
> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Affects Versions: 1.4.0
> Reporter: yanxiaobin
> Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png,
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema needs a parameter
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios,
> this is useful!
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)