StefanRRichter commented on a change in pull request #7711:
[FLINK-11618][state] Refactor operator state repartition mechanism
URL: https://github.com/apache/flink/pull/7711#discussion_r257260190
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
##########
@@ -31,36 +32,59 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Current default implementation of {@link OperatorStateRepartitioner} that
redistributes state in round robin fashion.
*/
+@Internal
public class RoundRobinOperatorStateRepartitioner implements
OperatorStateRepartitioner {
public static final OperatorStateRepartitioner INSTANCE = new
RoundRobinOperatorStateRepartitioner();
private static final boolean OPTIMIZE_MEMORY_USE = false;
@Override
public List<List<OperatorStateHandle>> repartitionState(
- List<OperatorStateHandle> previousParallelSubtaskStates,
+ List<List<OperatorStateHandle>>
previousParallelSubtaskStates,
+ int oldParallelism,
int newParallelism) {
Preconditions.checkNotNull(previousParallelSubtaskStates);
Preconditions.checkArgument(newParallelism > 0);
-
- // Reorganize: group by (State Name -> StreamStateHandle +
Offsets)
- GroupByStateNameResults nameToStateByMode =
groupByStateName(previousParallelSubtaskStates);
-
- if (OPTIMIZE_MEMORY_USE) {
- previousParallelSubtaskStates.clear(); // free for GC
at to cost that old handles are no longer available
- }
+
Preconditions.checkArgument(previousParallelSubtaskStates.size() ==
oldParallelism,
+ "This method still depends on the order of the new and
old operators");
// Assemble result from all merge maps
List<List<OperatorStateHandle>> result = new
ArrayList<>(newParallelism);
- // Do the actual repartitioning for all named states
- List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+ List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList;
+
+ // We only round-robin repartition UNION state if new
parallelism equals to the old one.
+ if (newParallelism == oldParallelism) {
+ Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> unionStates =
collectUnionStates(previousParallelSubtaskStates);
+
+ if (unionStates.isEmpty()) {
Review comment:
I think we also need to consider broadcast state. Broadcast state is
supposed to be redistributed similar to operator state. So even if there is
only a single broadcast state, we need to replicate it to all subtasks, similar
to union state. I think the problem why the test did not find this as a problem
is because is seems there is also always a union state tested when broadcast
state is tested.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services