[ 
https://issues.apache.org/jira/browse/FLINK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35299:
------------------------------------
    Description: 
h3. What

The FlinkKinesisConsumer allows users to read from [multiple Kinesis 
Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].

Users can also specify a STREAM_INITIAL_POSITION, which configures if the 
consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.

When restoring the Kinesis Consumer from an existing snapshot, users can 
configure the consumer to read from additional Kinesis Streams. The expected 
behavior would be for the FlinkKinesisConsumer to start reading from the 
additional Kinesis Streams respecting the STREAM_INITIAL_POSITION 
configuration. However, we find that it currently reads from TRIM_HORIZON.

This is surprising behavior and should be corrected.
h3. Why

Principle of Least Astonishment
h3. How

We recommend that we reconstruct the previously seen streams by iterating 
through the [sequenceNumsStateForCheckpoint in 
FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].
h3. Risks

This might increase the state restore time. We can consider adding a feature 
flag for users to turn this check off.

  was:
h3. What

The FlinkKinesisConsumer allows users to read from [multiple Kinesis 
Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].
 

Users can also specify a STREAM_INITIAL_POSITION, which configures if the 
consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.

When restoring the Kinesis Consumer from an existing snapshot, users can 
configure the consumer to read from additional Kinesis Streams. The expected 
behavior would be for the FlinkKinesisConsumer to start reading from the 
additional Kinesis Streams respecting the STREAM_INITIAL_POSITION 
configuration. However, we find that it currently reads from TRIM_HORIZON.

This is surprising behavior and should be corrected.
h3. Why

Principle of Least Astonishment
h3. How

We recommend that we reconstruct the previously seen streams by iterating 
through the [sequenceNumsStateForCheckpoint in 
FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].

 


> FlinkKinesisConsumer does not respect StreamInitialPosition for new Kinesis 
> Stream when restoring from snapshot
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35299
>                 URL: https://issues.apache.org/jira/browse/FLINK-35299
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-4.2.0
>            Reporter: Hong Liang Teoh
>            Priority: Major
>             Fix For: aws-connector-4.4.0
>
>
> h3. What
> The FlinkKinesisConsumer allows users to read from [multiple Kinesis 
> Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].
> Users can also specify a STREAM_INITIAL_POSITION, which configures if the 
> consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.
> When restoring the Kinesis Consumer from an existing snapshot, users can 
> configure the consumer to read from additional Kinesis Streams. The expected 
> behavior would be for the FlinkKinesisConsumer to start reading from the 
> additional Kinesis Streams respecting the STREAM_INITIAL_POSITION 
> configuration. However, we find that it currently reads from TRIM_HORIZON.
> This is surprising behavior and should be corrected.
> h3. Why
> Principle of Least Astonishment
> h3. How
> We recommend that we reconstruct the previously seen streams by iterating 
> through the [sequenceNumsStateForCheckpoint in 
> FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].
> h3. Risks
> This might increase the state restore time. We can consider adding a feature 
> flag for users to turn this check off.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to