[
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-8484:
---------------------------------------
Priority: Blocker (was: Major)
> Kinesis consumer re-reads closed shards on job restart
> ------------------------------------------------------
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
> Issue Type: Bug
> Components: Kinesis Connector
> Affects Versions: 1.3.2
> Reporter: Philip Luppens
> Priority: Blocker
> Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
> stream up and down during peak times. What we’ve noticed is that, while we
> were having closed shards, any Flink job restart with check- or save-point
> would result in shards being re-read from the event horizon, duplicating our
> events.
>
> We started checking the checkpoint state, and found that the shards were
> stored correctly with the proper sequence number (including for closed
> shards), but that upon restarts, the older closed shards would be read from
> the event horizon, as if their restored state would be ignored.
>
> In the end, we believe that we found the problem: in the
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned
> from the KinesisDataFetcher against the shards’ metadata from the restoration
> point, but we do this via a containsKey() call, which means we’ll use the
> StreamShardMetadata’s equals() method. However, this checks for all
> properties, including the endingSequenceNumber, which might have changed
> between the restored state’s checkpoint and our data fetch, thus failing the
> equality check, failing the containsKey() check, and resulting in the shard
> being re-read from the event horizon, even though it was present in the
> restored state.
>
> We’ve created a workaround where we only check for the shardId and stream
> name to restore the state of the shards we’ve already seen, and this seems to
> work correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)