[
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367544#comment-15367544
]
ASF GitHub Bot commented on FLINK-4019:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2214#discussion_r70059358
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
---
@@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
* This method is called by {@link ShardConsumer}s.
*
* @param record the record to collect
+ * @param recordTimestamp timestamp to attach to the collected record
* @param shardStateIndex index of the shard to update in
subscribedShardsState;
* this index should be the returned value from
* {@link
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
called
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
- protected void emitRecordAndUpdateState(T record, int shardStateIndex,
SequenceNumber lastSequenceNumber) {
+ protected void emitRecordAndUpdateState(T record, long recordTimestamp,
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
--- End diff --
Ah, sorry, this is actually wrong. Should be using
`sourceContext.collectWithTimestamp()`, missed this.
> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema
> interface
> -------------------------------------------------------------------------------------
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming Connectors
> Affects Versions: 1.1.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when
> Kinesis successfully receives the record:
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the
> deserialization schema.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)