pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -421,7 +423,27 @@ public void
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
stateAssignment.oldState.get(stateAssignment.inputOperatorID);
final List<List<InputChannelStateHandle>> inputOperatorState =
splitBySubtasks(inputState,
OperatorSubtaskState::getInputChannelState);
- if (inputState.getParallelism() ==
executionJobVertex.getParallelism()) {
+
+ boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+ .map(JobEdge::getDownstreamSubtaskStateMapper)
+ .anyMatch(m ->
!m.equals(SubtaskStateMapper.FULL))
+ &&
stateAssignment.executionJobVertex.getInputs().stream()
+ .map(IntermediateResult::getProducer)
+ .map(vertexAssignments::get)
+ .anyMatch(
+ taskStateAssignment -> {
+ final int oldParallelism =
+ stateAssignment
+ .oldState
+
.get(stateAssignment.inputOperatorID)
+ .getParallelism();
+ return oldParallelism
+ ==
taskStateAssignment.executionJobVertex
+ .getParallelism();
+ });
Review Comment:
Bumping:
> Maybe in StateAssignmentOperationTest create a unit test that has one FULL
and one something else, and assert that the assigned states are as they should
be?
> Does this bug have a test coverage? I mean, either was there some test
failing or have you added a new test to cover for a future regression?
sorry to bother you again, but the unit test that you have added still
doesn't have test coverage. When I try running your previous version of the
code:
```
boolean noNeedRescale =
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
.map(JobEdge::getDownstreamSubtaskStateMapper)
.anyMatch(m ->
!m.equals(SubtaskStateMapper.FULL))
&&
stateAssignment.executionJobVertex.getInputs().stream()
.map(IntermediateResult::getProducer)
.map(vertexAssignments::get)
.anyMatch(
taskStateAssignment -> {
final int oldParallelism =
stateAssignment
.oldState
.get(stateAssignment.inputOperatorID)
.getParallelism();
return oldParallelism
==
taskStateAssignment.executionJobVertex
.getParallelism();
});
if (inputState.getParallelism() ==
executionJobVertex.getParallelism() && !noNeedRescale) {
stateAssignment.inputChannelStates.putAll(
toInstanceMap(stateAssignment.inputOperatorID,
inputOperatorState));
return;
}
```
the tests in `StateAssignmentOperationTest` are still green.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]