pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
                 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
         final List<List<InputChannelStateHandle>> inputOperatorState =
                 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-        if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+        boolean noNeedRescale =
+                
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+                                .map(JobEdge::getDownstreamSubtaskStateMapper)
+                                .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+                        && 
stateAssignment.executionJobVertex.getInputs().stream()
+                                .map(IntermediateResult::getProducer)
+                                .map(vertexAssignments::get)
+                                .anyMatch(
+                                        taskStateAssignment -> {
+                                            final int oldParallelism =
+                                                    stateAssignment
+                                                            .oldState
+                                                            
.get(stateAssignment.inputOperatorID)
+                                                            .getParallelism();
+                                            return oldParallelism
+                                                    == 
taskStateAssignment.executionJobVertex
+                                                            .getParallelism();
+                                        });

Review Comment:
   > Does this bug have a test coverage? I mean, either was there some test 
failing or have you added a new test to cover for a future regression?
   
   
   Hey, sorry to bother you again, but the unit test that you have added still 
doesn't have test coverage. When I try running your previous version of the 
code:
   ```
           boolean noNeedRescale =
                   
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
                                   
.map(JobEdge::getDownstreamSubtaskStateMapper)
                                   .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
                           && 
stateAssignment.executionJobVertex.getInputs().stream()
                                   .map(IntermediateResult::getProducer)
                                   .map(vertexAssignments::get)
                                   .anyMatch(
                                           taskStateAssignment -> {
                                               final int oldParallelism =
                                                       stateAssignment
                                                               .oldState
                                                               
.get(stateAssignment.inputOperatorID)
                                                               
.getParallelism();
                                               return oldParallelism
                                                       == 
taskStateAssignment.executionJobVertex
                                                               
.getParallelism();
                                           });
   
           if (inputState.getParallelism() == 
executionJobVertex.getParallelism() && !noNeedRescale) {
               stateAssignment.inputChannelStates.putAll(
                       toInstanceMap(stateAssignment.inputOperatorID, 
inputOperatorState));
               return;
           }
   ```
   
   the tests in `StateAssignmentOperationTest` are still green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to