[
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471754#comment-16471754
]
ASF GitHub Bot commented on FLINK-8500:
---------------------------------------
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5958
There are a few bigger design aspects that we need to agree upon:
- The `DeserializationSchema` is a shared common denominator of
serialization schemata. That's why it is in `flink-core` and not in a Kafka
connector project. It is used by various per-record streaming sources, like
Kafka, RabbitMQ, in the future PubSub, or AMQ. It may be valuable to migrate
Kinesis also to that. This PR changes the common denominator to have very
Kafka-specific fields.
- The common schema makes sense because we can offer a library of
implementations, like for Avro, Json, Thrift, Protobuf. All connectors can use
hardened implementations for these types, or integrations with schema
registries.
- This surfaces for example in the SQL / Table API, which is currently
making an effort to change their source API to have "connector" and "format"
aspects orthogonal. You define a table as "from kafka with Avro", or "from
row-wise file with JSON", etc.
- We should think of having something similar in the future in the
unified batch/streaming DataStream API as well, when we rework our source
interface. At least a "row-wise source" that can then use all these format
implementations.
That means we are in a bit of a conflict between "common deserialization
schema" interface and surfacing connector specific information.
One way to approach that might be making the connector-specific
deserializer classes subclasses of the common one, and let them use specialized
subclasses of ConsumerRecordMetaInfo that have the additional fields.
On a separate note, I think that `ConsumerRecordMetaInfo` is not the best
name, because the type has not only the meta info, but the actual record. So we
could call it `Record` or `Datum` or `SourceRecord`, etc.
> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Affects Versions: 1.4.0
> Reporter: yanxiaobin
> Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png,
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema needs a parameter
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios,
> this is useful!
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)