[ 
https://issues.apache.org/jira/browse/FLINK-5019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636456#comment-15636456
 ] 

ASF GitHub Bot commented on FLINK-5019:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2746#discussion_r86552393
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
    @@ -63,164 +63,170 @@ public StateAssignmentOperation(
        public boolean assignStates() throws Exception {
     
                for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : 
latest.getTaskStates().entrySet()) {
    +
                        TaskState taskState = taskGroupStateEntry.getValue();
                        ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
     
    -                   if (executionJobVertex != null) {
    -                           // check that the number of key groups have not 
changed
    -                           if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
    -                                   throw new IllegalStateException("The 
maximum parallelism (" +
    -                                                   
taskState.getMaxParallelism() + ") with which the latest " +
    -                                                   "checkpoint of the 
execution job vertex " + executionJobVertex +
    -                                                   " has been taken and 
the current maximum parallelism (" +
    -                                                   
executionJobVertex.getMaxParallelism() + ") changed. This " +
    -                                                   "is currently not 
supported.");
    +                   if (executionJobVertex == null) {
    --- End diff --
    
    The code should do the same, I just found the if-nesting was a little deep 
and checking the preconditions first makes it more readable imo.


> Proper isRestored result for tasks that did not write state
> -----------------------------------------------------------
>
>                 Key: FLINK-5019
>                 URL: https://issues.apache.org/jira/browse/FLINK-5019
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> When a subtask is restored from a checkpoint that does not contain any state 
> (e.g. because the subtask did not write state in the previous run), the 
> result of {{StateInitializationContext::isRestored}} will incorrectly return 
> false.
> We should ensure that empty state is somehow reflected in a checkpoint and 
> return true on restore, independent from the presence of state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to