[
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367542#comment-15367542
]
ASF GitHub Bot commented on FLINK-4019:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2214#discussion_r70059282
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
---
@@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
* This method is called by {@link ShardConsumer}s.
*
* @param record the record to collect
+ * @param recordTimestamp timestamp to attach to the collected record
* @param shardStateIndex index of the shard to update in
subscribedShardsState;
* this index should be the returned value from
* {@link
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
called
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
- protected void emitRecordAndUpdateState(T record, int shardStateIndex,
SequenceNumber lastSequenceNumber) {
+ protected void emitRecordAndUpdateState(T record, long recordTimestamp,
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
--- End diff --
Did you also consider passing the record to the
`sourceContext.collectWithTimestamp()` method in addition to passing it to the
serialization schema?
> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema
> interface
> -------------------------------------------------------------------------------------
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming Connectors
> Affects Versions: 1.1.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when
> Kinesis successfully receives the record:
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the
> deserialization schema.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)