GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/3778
[FLINK-5969] Add savepoint backwards compatibility tests from 1.2 to 1.3
The binary savepoints and snapshots in the tests were created on the commit
of the Flink 1.2.0 release, so we test backwards compatibility within the Flink
1.2.x line. Once this is approved I'll open another PR that transplants these
commits on the master branch (with the binary snapshots/savepoints done on
Flink 1.2.0) so that we test migration compatibility between 1.2.0 and what is
going to be Flink 1.3.x.
I changed the naming of some existing tests so we now have
`*From11MigrationTest` and `*From12MigrationTest` (and one ITCase). Immediately
after releasing Flink 1.3.0 we should do the same, i.e. introduce
`*From13MigrationTest` and ITCase based on the existing tests.
The unit tests are somewhat straightforward: we feed some data into an
operator using an operator test harness, then we do a snapshot. (This is the
part that has to be done on the "old" version to generate the binary snapshot
that goes into the repo). The actual tests restore an operator form that
snapshot and verify the output.
The ITCase is a bit more involved. We have a complete Job of user-functions
and custom operators that tries to cover as many state/timer combinations as
possible. We start the job and, using accumulators, observe the number of
received elements in the sink. Once we get all elements we perform a savepoint
and cancel the job. Thus we have all state caused by the elements reflected in
our savepoint. This has to be done on the "old" version and the savepoint goes
into the repo. The restoring job is instrumented with code that verifies
restored state and updates accumulators. We listen on the accumulator changes
and cancel the job once we have seen all required verifications.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink
jira-5969-backwards-compat-12-13-on-release12
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3778.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3778
----
commit ef9e73a1f8af8903b0689eada2a9d853034fab88
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-20T12:48:22Z
[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs
commit 47143ba424355b7d25e9990bc308ea1744a0f33e
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-20T15:09:00Z
[FLINK-5969] Add savepoint IT case that checks restore from 1.2
The binary savepoints in this were created on the Flink 1.2.0 release
commit.
commit 3803dc04caae5e57f2cb23df0b6bc4663f8af08e
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-21T09:43:53Z
[FLINK-6353] Fix legacy user-state restore from 1.2
State that was checkpointed using Checkpointed (on a user function)
could be restored using CheckpointedRestoring when the savepoint was
done on Flink 1.2. The reason was an overzealous check in
AbstractUdfStreamOperator that only restores from "legacy" operator
state using CheckpointedRestoring when the stream is a Migration stream.
This removes that check but we still need to make sure to read away the
byte that indicates whether there is legacy state, which is written when
we're restoring from a Flink 1.1 savepoint.
After this fix, the procedure for a user to migrate a user function away
from the Checkpointed interface is this:
- Perform savepoint with user function still implementing Checkpointed,
shutdown job
- Change user function to implement CheckpointedRestoring
- Restore from previous savepoint, user function has to somehow move
the state that is restored using CheckpointedRestoring to another
type of state, .e.g operator state, using the OperatorStateStore.
- Perform another savepoint, shutdown job
- Remove CheckpointedRestoring interface from user function
- Restore from the second savepoint
- Done.
If the CheckpointedRestoring interface is not removed as prescribed in
the last steps then a future restore of a new savepoint will fail
because Flink will try to read legacy operator state that is not there
anymore. The above steps also apply to Flink 1.3, when a user want's to
move away from the Checkpointed interface.
commit f08661adcf3a64daf955ace70683ef2fe14cec2c
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-24T09:25:32Z
[FLINK-5969] Add ContinuousFileProcessingFrom12MigrationTest
The binary snapshots were created on the Flink 1.2 branch.
commit e70424eb6c9861e89c78f12143f319ce6eea49c1
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-24T10:31:53Z
[FLINK-5969] Add OperatorSnapshotUtil
This has methods for storing/reading OperatorStateHandles, as returned
from stream operator test harnesses. This can be used to write binary
snapshots for use in state migration tests.
commit 0217a2c3273157d4da936056fa5c76237d67b355
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-24T13:12:14Z
[FLINK-5969] Add KafkaConsumerBaseFrom12MigrationTest
The binary snapshots were created on the Flink 1.2 branch.
commit 6d3386bdb57e74ffecab76db211692aa734edf52
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-25T10:05:22Z
[FLINK-5969] Rename StatefulUDFSavepointFrom*MigrationITCases
commit f63e52c367bf85d328b9b6b3913ffe7dbd935d11
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-24T15:13:27Z
[FLINK-5969] Add WindowOperatorFrom12MigrationTest
The binary snapshots for this were created on the Flink 1.2 branch.
commit 525f98de5a90752918c7620ffaf2490d9c540452
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-24T15:13:49Z
[FLINK-5969] Also snapshot legacy state in operator test harness
commit 84fd38670dacf9f445f4361e85a494ad7512c3df
Author: Aljoscha Krettek <[email protected]>
Date: 2017-04-24T15:50:59Z
[FLINK-5969] Add BucketingSinkFrom12MigrationTest
The binary snapshots have been created on the Flink 1.2 branch.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---