GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/3770
[FLINK-5892] Restore state on the operator level
## General
This PR is a collaboration between @guoweiM and myself, enabling Flink to
restore state on the operator level. This means that the topology of a job may
change in regards to chains when restoring from a 1.3 savepoint, allowing the
arbitrary addition, removal or modification of chains.
The cornerstone for this is a semantic change for savepoints, no structural
changes have been made to the `SavepointV0/1/2` classes or their serialized
format:
In 1.2 a savepoint contains the states of tasks. If a task consists of
multiple operators then the stored TaskState internally contains a list of
states, one entry for each operator.
In 1.3 a savepoint contains the states of operators only; the notion of
tasks is eliminated. If a task consists of multiple operators we store one
TaskState for each operator instead. Internally they each contain a list of
states with a length of 1.
## Implementation
In order for this to work a number of changes had to be made.
First and foremost we required a new `StateAssignmentOperation` that was
aware of operators.
(74881a2, 8be9c58, 4fa8bbd)
Since the SAO uses the `ExecutionGraph` classes to map the restored state
it was necessary to forward the IDs of all contained operators from the
`StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
(73427c3)
The `PendingCheckpoint` class had to be adjusted to conform to the new
semantics; received `SubtaskStates`, containing the state of a task, are broken
down into SubtaskStates for the individual operators.
(f7b8ef9)
## Tests
The majority of this PR are new tests (60% or so).
A number of tests were added under flink-tests that test the migration path
from 1.2 to 1.3.
(d1efdb1)
These tests first restore a job from a 1.2 savepoint, without changes to
the topology, verify that the state was restored correctly and finally create a
new savepoint. They then restore from this migrated 1.3 savepoint, with changes
to the topology for varying scenarios, and verify the correct restoration of
state again.
A new test was also added to the `CheckpointCoordinatorTest` which tests
the support for topology changes without executing a job.
(8b5430f9)
A number of existing tests had to be tweaked to run with the new changes,
but these changes all boil down to extending existing mocks by a method or two.
(b5430f9)
## Other changes
To make it more obvious that we deal with operators and not tasks a new
`OperatorID` class was introduced, and usages of `JobVertexID` in
savepoint-related parts were replaced when appropriate.
(fe74023)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 5982_operator_state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3770.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3770
----
commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol <[email protected]>
Date: 2017-04-03T15:39:50Z
[prerequisite] Disable exception when assigning uid on chained operator
commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol <[email protected]>
Date: 2017-04-04T10:53:56Z
[internal] Adjust SavepointLoader to new Savepoint semantics
commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol <[email protected]>
Date: 2017-04-04T13:02:55Z
[internal] adjust PendingCheckpoint to be in line with new semantics
commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol <[email protected]>
Date: 2017-04-04T11:33:54Z
[internal] Get operator ID's into ExecutionGraph
commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol <[email protected]>
Date: 2017-04-25T16:07:16Z
[internals] Extract several utility methods from StateAssignmentOperation
commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw <[email protected]>
Date: 2017-04-24T09:47:47Z
[internal] Add new StateAssignmentOperation
commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol <[email protected]>
Date: 2017-04-04T13:01:07Z
[internal] Integrate new StateAssignmentOperation version
commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol <[email protected]>
Date: 2017-04-03T15:40:21Z
[tests] Add tests for chain modifications
commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol <[email protected]>
Date: 2017-04-24T11:58:07Z
[tests] Adjust existing tests
commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw <[email protected]>
Date: 2017-04-24T10:13:44Z
[tests] Add tests for topology modifications
commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1
Author: zentol <[email protected]>
Date: 2017-04-25T14:08:07Z
[internal] Introduce OperatorID for state business
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---