[
https://issues.apache.org/jira/browse/FLINK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hong Liang Teoh updated FLINK-35299:
------------------------------------
Description:
h3. What
The FlinkKinesisConsumer allows users to read from [multiple Kinesis
Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].
Users can also specify a STREAM_INITIAL_POSITION, which configures if the
consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.
When restoring the Kinesis Consumer from an existing snapshot, users can
configure the consumer to read from additional Kinesis Streams. The expected
behavior would be for the FlinkKinesisConsumer to start reading from the
additional Kinesis Streams respecting the STREAM_INITIAL_POSITION
configuration. However, we find that it currently reads from TRIM_HORIZON.
This is surprising behavior and should be corrected.
h3. Why
Principle of Least Astonishment
h3. How
We recommend that we reconstruct the previously seen streams by iterating
through the [sequenceNumsStateForCheckpoint in
FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].
h3. Risks
This might increase the state restore time. We can consider adding a feature
flag for users to turn this check off.
was:
h3. What
The FlinkKinesisConsumer allows users to read from [multiple Kinesis
Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].
Users can also specify a STREAM_INITIAL_POSITION, which configures if the
consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.
When restoring the Kinesis Consumer from an existing snapshot, users can
configure the consumer to read from additional Kinesis Streams. The expected
behavior would be for the FlinkKinesisConsumer to start reading from the
additional Kinesis Streams respecting the STREAM_INITIAL_POSITION
configuration. However, we find that it currently reads from TRIM_HORIZON.
This is surprising behavior and should be corrected.
h3. Why
Principle of Least Astonishment
h3. How
We recommend that we reconstruct the previously seen streams by iterating
through the [sequenceNumsStateForCheckpoint in
FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].
> FlinkKinesisConsumer does not respect StreamInitialPosition for new Kinesis
> Stream when restoring from snapshot
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35299
> URL: https://issues.apache.org/jira/browse/FLINK-35299
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: aws-connector-4.2.0
> Reporter: Hong Liang Teoh
> Priority: Major
> Fix For: aws-connector-4.4.0
>
>
> h3. What
> The FlinkKinesisConsumer allows users to read from [multiple Kinesis
> Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].
> Users can also specify a STREAM_INITIAL_POSITION, which configures if the
> consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.
> When restoring the Kinesis Consumer from an existing snapshot, users can
> configure the consumer to read from additional Kinesis Streams. The expected
> behavior would be for the FlinkKinesisConsumer to start reading from the
> additional Kinesis Streams respecting the STREAM_INITIAL_POSITION
> configuration. However, we find that it currently reads from TRIM_HORIZON.
> This is surprising behavior and should be corrected.
> h3. Why
> Principle of Least Astonishment
> h3. How
> We recommend that we reconstruct the previously seen streams by iterating
> through the [sequenceNumsStateForCheckpoint in
> FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].
> h3. Risks
> This might increase the state restore time. We can consider adding a feature
> flag for users to turn this check off.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)