StefanRRichter commented on a change in pull request #7711: 
[FLINK-11618][state] Refactor operator state repartition mechanism
URL: https://github.com/apache/flink/pull/7711#discussion_r259305356
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 ##########
 @@ -31,36 +32,59 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Current default implementation of {@link OperatorStateRepartitioner} that 
redistributes state in round robin fashion.
  */
+@Internal
 public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepartitioner {
 
        public static final OperatorStateRepartitioner INSTANCE = new 
RoundRobinOperatorStateRepartitioner();
        private static final boolean OPTIMIZE_MEMORY_USE = false;
 
        @Override
        public List<List<OperatorStateHandle>> repartitionState(
-                       List<OperatorStateHandle> previousParallelSubtaskStates,
+                       List<List<OperatorStateHandle>> 
previousParallelSubtaskStates,
+                       int oldParallelism,
                        int newParallelism) {
 
                Preconditions.checkNotNull(previousParallelSubtaskStates);
                Preconditions.checkArgument(newParallelism > 0);
-
-               // Reorganize: group by (State Name -> StreamStateHandle + 
Offsets)
-               GroupByStateNameResults nameToStateByMode = 
groupByStateName(previousParallelSubtaskStates);
-
-               if (OPTIMIZE_MEMORY_USE) {
-                       previousParallelSubtaskStates.clear(); // free for GC 
at to cost that old handles are no longer available
-               }
+               
Preconditions.checkArgument(previousParallelSubtaskStates.size() == 
oldParallelism,
+                       "This method still depends on the order of the new and 
old operators");
 
                // Assemble result from all merge maps
                List<List<OperatorStateHandle>> result = new 
ArrayList<>(newParallelism);
 
-               // Do the actual repartitioning for all named states
-               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList;
+
+               // We only round-robin repartition UNION state if new 
parallelism equals to the old one.
+               if (newParallelism == oldParallelism) {
+                       Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> unionStates = 
collectUnionStates(previousParallelSubtaskStates);
+
+                       if (unionStates.isEmpty()) {
 
 Review comment:
   Seems that you are right. Sorry for that, my mistake. Then please ignore my 
comments about that, it seems this was different from what I thought.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to