AHeise commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r518175408
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
##########
@@ -43,4 +44,9 @@ public int
selectChannel(SerializationDelegate<StreamRecord<T>> record) {
public String toString() {
return "FORWARD";
}
+
+ @Override
+ public ChannelStateRescaler getDownstreamChannelStateRescaler() {
+ return ChannelStateRescaler.FIRST_CHANNEL;
Review comment:
Really good catch. I was confused by the selection process of
ForwardPartitioner always selecting 0. But what we want is to have a round
robin distribution (so assign same as before in case of upscaling),
redistribute on downscale to avoid data loss.
I used this find to rethink the nomenclature and actually everything that is
related to `ChannelStateRescaler` is actually about subtask instance mapping,
so I renamed to `SubtaskStateMapper`. Then it becomes immediately apparent that
choosing the first task is plain wrong here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]