ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603115064
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java:
##########
@@ -232,7 +232,9 @@ public Predicate<StreamRecord<T>> apply(InputChannelInfo
channelInfo) {
channelInfo.getGateIdx(), this::createPartitioner);
// use a copy of partitioner to ensure that the filter of
ambiguous virtual channels
// have the same state across several subtasks
- return new RecordFilter<>(partitioner.copy(), inputSerializer,
subtaskIndex);
+ StreamPartitioner<T> partitionerCopy = partitioner.copy();
+ partitionerCopy.setup(numberOfChannels);
+ return new RecordFilter<>(partitionerCopy, inputSerializer,
subtaskIndex);
Review Comment:
By testing reported bug, I found little problem with copying partitioner in
rescaling case. It can be throws ArithmeticException /by zero for
CustomPartitioners (there used something like ` ... % numberOfChannels`),
because after copying we have numberOfChannels is 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]