GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3770
[FLINK-5892] Restore state on the operator level ## General This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains. The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format: In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator. In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1. ## Implementation In order for this to work a number of changes had to be made. First and foremost we required a new `StateAssignmentOperation` that was aware of operators. (74881a2, 8be9c58, 4fa8bbd) Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`. (73427c3) The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators. (f7b8ef9) ## Tests The majority of this PR are new tests (60% or so). A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3. (d1efdb1) These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again. A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job. (8b5430f9) A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two. (b5430f9) ## Other changes To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate. (fe74023) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5982_operator_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3770 ---- commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f Author: zentol <ches...@apache.org> Date: 2017-04-03T15:39:50Z [prerequisite] Disable exception when assigning uid on chained operator commit 74881a2788d034db67d99d6d32dbb2cf923aed53 Author: zentol <ches...@apache.org> Date: 2017-04-04T10:53:56Z [internal] Adjust SavepointLoader to new Savepoint semantics commit f7b8ef943097cd994a4ef3d5594fea4027720f5a Author: zentol <ches...@apache.org> Date: 2017-04-04T13:02:55Z [internal] adjust PendingCheckpoint to be in line with new semantics commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac Author: zentol <ches...@apache.org> Date: 2017-04-04T11:33:54Z [internal] Get operator ID's into ExecutionGraph commit 465805792932cb888393d9257fdefd828fa59343 Author: zentol <ches...@apache.org> Date: 2017-04-25T16:07:16Z [internals] Extract several utility methods from StateAssignmentOperation commit 008e848715b7091c3deabc9251d9d673f5506e64 Author: guowei.mgw <guowei....@gmail.com> Date: 2017-04-24T09:47:47Z [internal] Add new StateAssignmentOperation commit ffb93298ce90956b9886b3526258f6a814b7e0af Author: zentol <ches...@apache.org> Date: 2017-04-04T13:01:07Z [internal] Integrate new StateAssignmentOperation version commit d1efdb1c34d59f04147292b320528cd2bc838244 Author: zentol <ches...@apache.org> Date: 2017-04-03T15:40:21Z [tests] Add tests for chain modifications commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7 Author: zentol <ches...@apache.org> Date: 2017-04-24T11:58:07Z [tests] Adjust existing tests commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714 Author: guowei.mgw <guowei....@gmail.com> Date: 2017-04-24T10:13:44Z [tests] Add tests for topology modifications commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1 Author: zentol <ches...@apache.org> Date: 2017-04-25T14:08:07Z [internal] Introduce OperatorID for state business ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---