GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/3770

    [FLINK-5892] Restore state on the operator level

    ## General
    This PR is a collaboration between @guoweiM and myself, enabling Flink to 
restore state on the operator level. This means that the topology of a job may 
change in regards to chains when restoring from a 1.3 savepoint, allowing the 
arbitrary addition, removal or modification of chains.
    
    The cornerstone for this is a semantic change for savepoints, no structural 
changes have been made to the `SavepointV0/1/2` classes or their serialized 
format:
    
    In 1.2 a savepoint contains the states of tasks. If a task consists of 
multiple operators then the stored TaskState internally contains a list of 
states, one entry for each operator.
    
    In 1.3 a savepoint contains the states of operators only; the notion of 
tasks is eliminated. If a task consists of multiple operators we store one 
TaskState for each operator instead. Internally they each contain a list of 
states with a length of 1.
    
    ## Implementation
    
    In order for this to work a number of changes had to be made.
    
    First and foremost we required a new `StateAssignmentOperation` that was 
aware of operators.
    (74881a2, 8be9c58, 4fa8bbd)
    
    Since the SAO uses the `ExecutionGraph` classes to map the restored state 
it was necessary to forward the IDs of all contained operators from the 
`StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
    (73427c3)
    
    The `PendingCheckpoint` class had to be adjusted to conform to the new 
semantics; received `SubtaskStates`, containing the state of a task, are broken 
down into SubtaskStates for the individual operators.
    (f7b8ef9)
    
    ## Tests
    
    The majority of this PR are new tests (60% or so).
    
    A number of tests were added under flink-tests that test the migration path 
from 1.2 to 1.3.
    (d1efdb1)
    
    These tests first restore a job from a 1.2 savepoint, without changes to 
the topology, verify that the state was restored correctly and finally create a 
new savepoint. They then restore from this migrated 1.3 savepoint, with changes 
to the topology for varying scenarios, and verify the correct restoration of 
state again.
    
    A new test was also added to the `CheckpointCoordinatorTest` which tests 
the support for topology changes without executing a job.
    (8b5430f9)
    
    A number of existing tests had to be tweaked to run with the new changes, 
but these changes all boil down to extending existing mocks by a method or two.
    (b5430f9)
    
    ## Other changes
    
    To make it more obvious that we deal with operators and not tasks a new 
`OperatorID` class was introduced, and usages of `JobVertexID` in 
savepoint-related parts were replaced when appropriate.
    (fe74023)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3770
    
----
commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol <ches...@apache.org>
Date:   2017-04-03T15:39:50Z

    [prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol <ches...@apache.org>
Date:   2017-04-04T10:53:56Z

    [internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol <ches...@apache.org>
Date:   2017-04-04T13:02:55Z

    [internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol <ches...@apache.org>
Date:   2017-04-04T11:33:54Z

    [internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol <ches...@apache.org>
Date:   2017-04-25T16:07:16Z

    [internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw <guowei....@gmail.com>
Date:   2017-04-24T09:47:47Z

    [internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol <ches...@apache.org>
Date:   2017-04-04T13:01:07Z

    [internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol <ches...@apache.org>
Date:   2017-04-03T15:40:21Z

    [tests] Add tests for chain modifications

commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol <ches...@apache.org>
Date:   2017-04-24T11:58:07Z

    [tests] Adjust existing tests

commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw <guowei....@gmail.com>
Date:   2017-04-24T10:13:44Z

    [tests] Add tests for topology modifications

commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1
Author: zentol <ches...@apache.org>
Date:   2017-04-25T14:08:07Z

    [internal] Introduce OperatorID for state business

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to