[ 
https://issues.apache.org/jira/browse/FLINK-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-5932:
----------------------------------
    Description: 
Currently in the {{AbstractStreamOperator::initializeState(OperatorStateHandles 
stateHandles)}}, the {{restoreStreamCheckpointed}} which is responsible for 
restoring state from previous Flink versions, (backwards compatibility) is 
called before the {{initializeState(StateInitializationContext context)}} which 
is responsible for initializing the state in Flink 1.2.

This has the negative side effect that when implementing the backwards 
compatibility strategy for a given operator, we have to restore the old state, 
store it in local variables, and register it with the new state abstractions in 
the {{initializeState()}} or the {{open()}}. This creates a lot of unnecessary 
code in the operators, and potential memory leaks if the local variables are 
not "null-ified".

This issue proposes to call the {{restoreStreamCheckpointed}} after the 
{{initializeState(StateInitializationContext context)}}. This way, the new 
operator state will have been initialized (e.g. keyed state), and the 
{{restoreStreamCheckpointed}} will be able to register its state directly with 
the new abstractions, instead of putting it in local variables and wait for the 
{{initializeState}} or the {{open()}} to re-register it.

  was:
Currently in the `AbstractStreamOperator::initializeState(OperatorStateHandles 
stateHandles)`, the `restoreStreamCheckpointed` which is responsible for 
restoring state from previous Flink versions, (backwards compatibility) is 
called before the `initializeState(StateInitializationContext context)` which 
is responsible for initializing the state in Flink 1.2.

This has the negative side effect that when implementing the backwards 
compatibility strategy for a given operator, we have to restore the old state, 
store it in local variables, and register it with the new state abstractions in 
the `initializeState()` or the `open()`. This creates a lot of unnecessary code 
in the operators, and potential memory leaks if the local variables are not 
"null-ified".

This issue proposes to call the `restoreStreamCheckpointed` after the 
`initializeState(StateInitializationContext context)`. This way, the new 
operator state will have been initialized (e.g. keyed state), and the 
`restoreStreamCheckpointed` will be able to register its state directly with 
the new abstractions, instead of putting it in local variables and wait for the 
`initializeState` or the `open()` to re-register it.


> Order of legacy vs new state initialization in the AbstractStreamOperator.
> --------------------------------------------------------------------------
>
>                 Key: FLINK-5932
>                 URL: https://issues.apache.org/jira/browse/FLINK-5932
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0
>            Reporter: Kostas Kloudas
>             Fix For: 1.3.0
>
>
> Currently in the 
> {{AbstractStreamOperator::initializeState(OperatorStateHandles 
> stateHandles)}}, the {{restoreStreamCheckpointed}} which is responsible for 
> restoring state from previous Flink versions, (backwards compatibility) is 
> called before the {{initializeState(StateInitializationContext context)}} 
> which is responsible for initializing the state in Flink 1.2.
> This has the negative side effect that when implementing the backwards 
> compatibility strategy for a given operator, we have to restore the old 
> state, store it in local variables, and register it with the new state 
> abstractions in the {{initializeState()}} or the {{open()}}. This creates a 
> lot of unnecessary code in the operators, and potential memory leaks if the 
> local variables are not "null-ified".
> This issue proposes to call the {{restoreStreamCheckpointed}} after the 
> {{initializeState(StateInitializationContext context)}}. This way, the new 
> operator state will have been initialized (e.g. keyed state), and the 
> {{restoreStreamCheckpointed}} will be able to register its state directly 
> with the new abstractions, instead of putting it in local variables and wait 
> for the {{initializeState}} or the {{open()}} to re-register it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to