StefanRRichter commented on a change in pull request #7711:
[FLINK-11618][state] Refactor operator state repartition mechanism
URL: https://github.com/apache/flink/pull/7711#discussion_r259305356
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
##########
@@ -31,36 +32,59 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Current default implementation of {@link OperatorStateRepartitioner} that
redistributes state in round robin fashion.
*/
+@Internal
public class RoundRobinOperatorStateRepartitioner implements
OperatorStateRepartitioner {
public static final OperatorStateRepartitioner INSTANCE = new
RoundRobinOperatorStateRepartitioner();
private static final boolean OPTIMIZE_MEMORY_USE = false;
@Override
public List<List<OperatorStateHandle>> repartitionState(
- List<OperatorStateHandle> previousParallelSubtaskStates,
+ List<List<OperatorStateHandle>>
previousParallelSubtaskStates,
+ int oldParallelism,
int newParallelism) {
Preconditions.checkNotNull(previousParallelSubtaskStates);
Preconditions.checkArgument(newParallelism > 0);
-
- // Reorganize: group by (State Name -> StreamStateHandle +
Offsets)
- GroupByStateNameResults nameToStateByMode =
groupByStateName(previousParallelSubtaskStates);
-
- if (OPTIMIZE_MEMORY_USE) {
- previousParallelSubtaskStates.clear(); // free for GC
at to cost that old handles are no longer available
- }
+
Preconditions.checkArgument(previousParallelSubtaskStates.size() ==
oldParallelism,
+ "This method still depends on the order of the new and
old operators");
// Assemble result from all merge maps
List<List<OperatorStateHandle>> result = new
ArrayList<>(newParallelism);
- // Do the actual repartitioning for all named states
- List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+ List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList;
+
+ // We only round-robin repartition UNION state if new
parallelism equals to the old one.
+ if (newParallelism == oldParallelism) {
+ Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> unionStates =
collectUnionStates(previousParallelSubtaskStates);
+
+ if (unionStates.isEmpty()) {
Review comment:
Seems that you are right. Sorry for that, my mistake. Then please ignore my
comments about that, it seems this was different from what I thought.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services