StefanRRichter commented on a change in pull request #7711: 
[FLINK-11618][state] Refactor operator state repartition mechanism
URL: https://github.com/apache/flink/pull/7711#discussion_r257260190
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 ##########
 @@ -31,36 +32,59 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Current default implementation of {@link OperatorStateRepartitioner} that 
redistributes state in round robin fashion.
  */
+@Internal
 public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepartitioner {
 
        public static final OperatorStateRepartitioner INSTANCE = new 
RoundRobinOperatorStateRepartitioner();
        private static final boolean OPTIMIZE_MEMORY_USE = false;
 
        @Override
        public List<List<OperatorStateHandle>> repartitionState(
-                       List<OperatorStateHandle> previousParallelSubtaskStates,
+                       List<List<OperatorStateHandle>> 
previousParallelSubtaskStates,
+                       int oldParallelism,
                        int newParallelism) {
 
                Preconditions.checkNotNull(previousParallelSubtaskStates);
                Preconditions.checkArgument(newParallelism > 0);
-
-               // Reorganize: group by (State Name -> StreamStateHandle + 
Offsets)
-               GroupByStateNameResults nameToStateByMode = 
groupByStateName(previousParallelSubtaskStates);
-
-               if (OPTIMIZE_MEMORY_USE) {
-                       previousParallelSubtaskStates.clear(); // free for GC 
at to cost that old handles are no longer available
-               }
+               
Preconditions.checkArgument(previousParallelSubtaskStates.size() == 
oldParallelism,
+                       "This method still depends on the order of the new and 
old operators");
 
                // Assemble result from all merge maps
                List<List<OperatorStateHandle>> result = new 
ArrayList<>(newParallelism);
 
-               // Do the actual repartitioning for all named states
-               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList;
+
+               // We only round-robin repartition UNION state if new 
parallelism equals to the old one.
+               if (newParallelism == oldParallelism) {
+                       Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> unionStates = 
collectUnionStates(previousParallelSubtaskStates);
+
+                       if (unionStates.isEmpty()) {
 
 Review comment:
   I think we also need to consider broadcast state. Broadcast state is 
supposed to be redistributed similar to operator state. So even if there is 
only a single broadcast state, we need to replicate it to all subtasks, similar 
to union state. I think the problem why the test did not find this as a problem 
is because is seems there is also always a union state tested when broadcast 
state is tested.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to