[ https://issues.apache.org/jira/browse/FLINK-38345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matt Cuento updated FLINK-38345: -------------------------------- Description: {{org.apache.flink.connector.base.source.hybrid.HybridSource}} upon recovery from a checkpoint calls {{restoreEnumerator()}} with the configured source factories and checkpoint information. The enumerator {{org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator}} then is started which invokes {{{}switchEnumerator(){}}}. The {{currentEnumerator}} is null on recovery and as part of source switching logic, {{previousEnumerator}} becomes null. For hybrid source implementations that require the use of the {{org.apache.flink.connector.base.source.hybrid.HybridSource.SourceSwitchContext}} to derive a start position at switch time, we end up in a situation where {{getPreviousEnumerator()}} returns null, meaning we're unable to derive a start position on checkpoint recovery. Enumerator state is later deserialized and recovered, but requires a source to get the serializer needed. So long as a source depends on a prior enumerator to derive the start position, this bug will be in effect. This bug can be reproduced via a unit test in the enumerator [1]. [1] - [https://github.com/apache/flink/pull/26975] was: `org.apache.flink.connector.base.source.hybrid.HybridSource` upon recovery from a checkpoint calls `restoreEnumerator()` with the configured source factories and checkpoint information. The enumerator `org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator` then is started which invokes `switchEnumerator`. The `currentEnumerator` is null on recovery and as part of source switching logic, `previousEnumerator` becomes null. For hybrid source implementations that require the use of the `org.apache.flink.connector.base.source.hybrid.HybridSource.SourceSwitchContext` to derive a start position at switch time, we end up in a situation where `getPreviousEnumerator()` returns null, meaning we're unable to derive a start position on checkpoint recovery. Enumerator state is later deserialized and recovered, but requires a source to get the serializer needed. So long as a source depends on a prior enumerator to derive the start position, this bug will be in effect. This bug can be reproduced via a unit test in the enumerator [1]. [1] - [https://github.com/apache/flink/pull/26975] > HybridSource checkpoint recovery results in NPE if addSource depends on > previousEnumerator from SourceSwitchContext > ------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-38345 > URL: https://issues.apache.org/jira/browse/FLINK-38345 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource > Affects Versions: 2.0.0, 1.19.1 > Reporter: Matt Cuento > Priority: Minor > Labels: pull-request-available > > {{org.apache.flink.connector.base.source.hybrid.HybridSource}} upon recovery > from a checkpoint calls {{restoreEnumerator()}} with the configured source > factories and checkpoint information. > The enumerator > {{org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator}} > then is started which invokes {{{}switchEnumerator(){}}}. > The {{currentEnumerator}} is null on recovery and as part of source switching > logic, {{previousEnumerator}} becomes null. > For hybrid source implementations that require the use of the > {{org.apache.flink.connector.base.source.hybrid.HybridSource.SourceSwitchContext}} > to derive a start position at switch time, we end up in a situation where > {{getPreviousEnumerator()}} returns null, meaning we're unable to derive a > start position on checkpoint recovery. > Enumerator state is later deserialized and recovered, but requires a source > to get the serializer needed. So long as a source depends on a prior > enumerator to derive the start position, this bug will be in effect. > > This bug can be reproduced via a unit test in the enumerator [1]. > > [1] - [https://github.com/apache/flink/pull/26975] -- This message was sent by Atlassian Jira (v8.20.10#820010)