[
https://issues.apache.org/jira/browse/FLINK-32394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-32394:
----------------------------------
Description:
The Init.pos is partially stored in connector state which can lead to
inconsistencies further down the line in idle streams. In particularly an issue
arises when the init.pos is AT_TIMESTAMP, and the init.pos is later changed to
TRIM_HORIZON.
The issue is that AT_TIMESTAMP is stored in the connector state but the
timestamp itself isn't stored in the state. If a stream is idle and the
init.pos is changed to TRIM_HORIZON then the connector attempts to read from
AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp
from the properties however it is no longer there as the init.pos property is
not TRIM_HORIZON.
Sample error
{code:java}
java.lang.IllegalArgumentException: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:579)
at
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getStartingPosition(AWSUtil.java:325)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:495)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:465)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:592)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.NullPointerException
at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1470)
at java.base/java.text.DateFormat.parse(DateFormat.java:393)
at
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:577)
... 8 more
{code}
was:
The Init.pos is partially stored in connector state which can lead to
inconsistencies further down the line in idle streams. In particularly an issue
arises when the init.pos is AT_TIMESTAMP, and the init.pos is later changed to
TRIM_HORIZON.
The issue is that AT_TIMESTAMP is stored in the connector state but the
timestamp itself isn't stored in the state. If a stream is idle and the
init.pos is changed to TRIM_HORIZON then the connector attempts to read from
AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp
from the properties however it is no longer there as the init.pos property is
not TRIM_HORIZON.
> Init.pos is partially stored in connector state
> -----------------------------------------------
>
> Key: FLINK-32394
> URL: https://issues.apache.org/jira/browse/FLINK-32394
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.17.1
> Reporter: Usamah Jassat
> Priority: Minor
>
> The Init.pos is partially stored in connector state which can lead to
> inconsistencies further down the line in idle streams. In particularly an
> issue arises when the init.pos is AT_TIMESTAMP, and the init.pos is later
> changed to TRIM_HORIZON.
> The issue is that AT_TIMESTAMP is stored in the connector state but the
> timestamp itself isn't stored in the state. If a stream is idle and the
> init.pos is changed to TRIM_HORIZON then the connector attempts to read from
> AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp
> from the properties however it is no longer there as the init.pos property is
> not TRIM_HORIZON.
> Sample error
>
> {code:java}
> java.lang.IllegalArgumentException: java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:579)
> at
> org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getStartingPosition(AWSUtil.java:325)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:495)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:465)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:592)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.lang.NullPointerException
> at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1470)
> at java.base/java.text.DateFormat.parse(DateFormat.java:393)
> at
> org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:577)
> ... 8 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)