Philip Luppens created FLINK-8484:
-------------------------------------
Summary: Kinesis consumer re-reads
Key: FLINK-8484
URL: https://issues.apache.org/jira/browse/FLINK-8484
Project: Flink
Issue Type: Bug
Components: Kinesis Connector
Affects Versions: 1.3.2
Reporter: Philip Luppens
We’re using the connector to subscribe to streams varying from 1 to a 100
shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
stream up and down during peak times. What we’ve noticed is that, while we were
having closed shards, any Flink job restart with check- or save-point would
result in shards being re-read from the event horizon, duplicating our events.
We started checking the checkpoint state, and found that the shards were stored
correctly with the proper sequence number (including for closed shards), but
that upon restarts, the older closed shards would be read from the event
horizon, as if their restored state would be ignored.
In the end, we believe that we found the problem: in the FlinkKinesisConsumer’s
run() method, we’re trying to find the shard returned from the
KinesisDataFetcher against the shards’ metadata from the restoration point, but
we do this via a containsKey() call, which means we’ll use the
StreamShardMetadata’s equals() method. However, this checks for all properties,
including the endingSequenceNumber, which might have changed between the
restored state’s checkpoint and our data fetch, thus failing the equality
check, failing the containsKey() check, and resulting in the shard being
re-read from the event horizon, even though it was present in the restored
state.
We’ve created a workaround where we only check for the shardId and stream name
to restore the state of the shards we’ve already seen, and this seems to work
correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)