dawidwys commented on a change in pull request #16019:
URL: https://github.com/apache/flink/pull/16019#discussion_r643027417



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java
##########
@@ -101,50 +66,150 @@ public boolean equals(Object o) {
             return false;
         }
         InflightDataRescalingDescriptor that = 
(InflightDataRescalingDescriptor) o;
-        return Arrays.equals(oldSubtaskIndexes, that.oldSubtaskIndexes)
-                && Arrays.equals(rescaledChannelsMappings, 
that.rescaledChannelsMappings)
-                && Objects.equals(ambiguousSubtaskIndexes, 
that.ambiguousSubtaskIndexes);
+        return Arrays.equals(gateOrPartitionDescriptors, 
that.gateOrPartitionDescriptors);
     }
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(ambiguousSubtaskIndexes);
-        result = 31 * result + Arrays.hashCode(oldSubtaskIndexes);
-        result = 31 * result + Arrays.hashCode(rescaledChannelsMappings);
-        return result;
+        return Arrays.hashCode(gateOrPartitionDescriptors);
     }
 
     @Override
     public String toString() {
         return "InflightDataRescalingDescriptor{"
-                + "oldSubtaskIndexes="
-                + Arrays.toString(oldSubtaskIndexes)
-                + ", rescaledChannelsMappings="
-                + Arrays.toString(rescaledChannelsMappings)
-                + ", ambiguousSubtaskIndexes="
-                + ambiguousSubtaskIndexes
+                + "gateOrPartitionDescriptors="
+                + Arrays.toString(gateOrPartitionDescriptors)
                 + '}';
     }
 
+    /**
+     * Captures ambiguous mappings of old channels to new channels.
+     *
+     * <p>For inputs, this mapping implies the following:
+     * <li>
+     *
+     *     <ul>
+     *       {@link #oldSubtaskIndexes} is set when there is a rescale on this 
task potentially
+     *       leading to different key groups. Upstream task has a 
corresponding {@link
+     *       #rescaledChannelsMappings} where it sends data over virtual 
channel while specifying
+     *       the channel index in the VirtualChannelSelector. This subtask 
then demultiplexes over
+     *       the virtual subtask index.
+     * </ul>
+     *
+     * <ul>
+     *   {@link #rescaledChannelsMappings} is set when there is a downscale of 
the upstream task.
+     *   Upstream task has a corresponding {@link #oldSubtaskIndexes} where it 
sends data over
+     *   virtual channel while specifying the subtask index in the 
VirtualChannelSelector. This
+     *   subtask then demultiplexes over channel indexes.
+     * </ul>
+     *
+     * <p>For outputs, it's vice-versa. The information must be kept in sync 
but they are used in
+     * opposite ways for multiplexing/demultiplexing.
+     *
+     * <p>Note that in the common rescaling case both information is set and 
need to be
+     * simultaneously used. If the input subtask subsumes the state of 3 old 
subtasks and a channel
+     * corresponds to 2 old channels, then there are 6 virtual channels to be 
demultiplexed.
+     */
+    public static class InflightDataGateOrPartitionRescalingDescriptor 
implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        /** Set when several operator instances are merged into one. */
+        private final int[] oldSubtaskIndexes;
+
+        /**
+         * Set when channels are merged because the connected operator has 
been rescaled for each
+         * gate/partition.
+         */
+        private final RescaleMappings rescaledChannelsMappings;
+
+        /** All channels where upstream duplicates data (only valid for 
downstream mappings). */
+        private final Set<Integer> ambiguousSubtaskIndexes;
+
+        private final Rescaling rescaling;
+
+        enum Rescaling {

Review comment:
       I will refactor it to `MappingType`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to