[
https://issues.apache.org/jira/browse/FLINK-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552276#comment-15552276
]
ASF GitHub Bot commented on FLINK-4731:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2584#discussion_r82213262
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
---
@@ -663,28 +826,42 @@ public Integer snapshotState(long checkpointId, long
checkpointTimestamp) throws
public void restoreState(Integer state) throws Exception {
counter = state;
}
+ }
+
+ private static class PartitionedStateSource extends StateSourceBase
implements ListCheckpointed<Integer> {
+
+ private static final long serialVersionUID =
-359715965103593462L;
+ private static final int NUM_PARTITIONS = 7;
+
+ private static int[] CHECK_CORRECT_SNAPSHOT;
+ private static int[] CHECK_CORRECT_RESTORE;
@Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- final Object lock = ctx.getCheckpointLock();
+ public List<Integer> snapshotState(long checkpointId, long
timestamp) throws Exception {
- while (running) {
- synchronized (lock) {
- counter++;
+
CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
- ctx.collect(counter *
getRuntimeContext().getIndexOfThisSubtask());
- }
+ int div = counter / NUM_PARTITIONS;
+ int mod = counter % NUM_PARTITIONS;
- Thread.sleep(2);
- if(counter == 10) {
- workStartedLatch.countDown();
+ List<Integer> split = new ArrayList<>();
+ for (int i = 0; i < NUM_PARTITIONS; ++i) {
+ int partitionValue = div;
+ if (mod > 0) {
+ --mod;
+ ++partitionValue;
}
+ split.add(partitionValue);
}
+ return split;
}
@Override
- public void cancel() {
- running = false;
+ public void restoreState(List<Integer> state) throws Exception {
+ for(Integer v : state) {
--- End diff --
Missing whitespace after `for`
> HeapKeyedStateBackend restoring broken for scale-in
> ---------------------------------------------------
>
> Key: FLINK-4731
> URL: https://issues.apache.org/jira/browse/FLINK-4731
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Restoring the HeapKeyedStateBackend is broken in case that parallelism is
> reduced. The restore method is overwriting previously restored state.
> We should also add scale-in testing to the RescalingITCase.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)