This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 15e137e [hotfix][state] Improve memory-friendliness of state
assignment by allocating collections with correct sizes or improved size
estimates where possible
15e137e is described below
commit 15e137ec64b6d86ea67a65880c6f4ffb2c24afbd
Author: Stefan Richter <[email protected]>
AuthorDate: Wed Aug 8 14:12:22 2018 +0200
[hotfix][state] Improve memory-friendliness of state assignment by
allocating collections with correct sizes or improved size estimates where
possible
---
.../checkpoint/OperatorStateRepartitioner.java | 3 +-
.../RoundRobinOperatorStateRepartitioner.java | 33 ++++---
.../checkpoint/StateAssignmentOperation.java | 103 +++++++++++++--------
.../runtime/checkpoint/StateObjectCollection.java | 9 ++
.../checkpoint/CheckpointCoordinatorTest.java | 7 +-
5 files changed, 95 insertions(+), 60 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
index 090f48a..ce2d403 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import java.util.Collection;
import java.util.List;
/**
@@ -36,7 +35,7 @@ public interface OperatorStateRepartitioner {
* @return List with one entry per parallel subtask. Each subtask
receives now one collection of states that build
* of the new total state for this subtask.
*/
- List<Collection<OperatorStateHandle>> repartitionState(
+ List<List<OperatorStateHandle>> repartitionState(
List<OperatorStateHandle> previousParallelSubtaskStates,
int newParallelism);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
index e6fa687..4705265 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -26,11 +26,11 @@ import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Current default implementation of {@link OperatorStateRepartitioner} that
redistributes state in round robin fashion.
@@ -41,7 +41,7 @@ public class RoundRobinOperatorStateRepartitioner implements
OperatorStateRepart
private static final boolean OPTIMIZE_MEMORY_USE = false;
@Override
- public List<Collection<OperatorStateHandle>> repartitionState(
+ public List<List<OperatorStateHandle>> repartitionState(
List<OperatorStateHandle> previousParallelSubtaskStates,
int newParallelism) {
@@ -56,7 +56,7 @@ public class RoundRobinOperatorStateRepartitioner implements
OperatorStateRepart
}
// Assemble result from all merge maps
- List<Collection<OperatorStateHandle>> result = new
ArrayList<>(newParallelism);
+ List<List<OperatorStateHandle>> result = new
ArrayList<>(newParallelism);
// Do the actual repartitioning for all named states
List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
@@ -93,20 +93,19 @@ public class RoundRobinOperatorStateRepartitioner
implements OperatorStateRepart
continue;
}
- for (Map.Entry<String,
OperatorStateHandle.StateMetaInfo> e :
-
psh.getStateNameToPartitionOffsets().entrySet()) {
+ final Set<Map.Entry<String,
OperatorStateHandle.StateMetaInfo>> partitionOffsetEntries =
+ psh.getStateNameToPartitionOffsets().entrySet();
+
+ for (Map.Entry<String,
OperatorStateHandle.StateMetaInfo> e : partitionOffsetEntries) {
OperatorStateHandle.StateMetaInfo metaInfo =
e.getValue();
Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> nameToState =
nameToStateByMode.get(metaInfo.getDistributionMode());
List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>> stateLocations =
- nameToState.get(e.getKey());
-
- if (stateLocations == null) {
- stateLocations = new ArrayList<>();
- nameToState.put(e.getKey(),
stateLocations);
- }
+ nameToState.computeIfAbsent(
+ e.getKey(),
+ k -> new
ArrayList<>(previousParallelSubtaskStates.size() *
partitionOffsetEntries.size()));
stateLocations.add(new
Tuple2<>(psh.getDelegateStateHandle(), e.getValue()));
}
@@ -203,7 +202,9 @@ public class RoundRobinOperatorStateRepartitioner
implements OperatorStateRepart
Map<StreamStateHandle,
OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
OperatorStateHandle operatorStateHandle
= mergeMap.get(handleWithOffsets.f0);
if (operatorStateHandle == null) {
- operatorStateHandle = new
OperatorStreamStateHandle(new HashMap<>(), handleWithOffsets.f0);
+ operatorStateHandle = new
OperatorStreamStateHandle(
+ new
HashMap<>(distributeNameToState.size()),
+ handleWithOffsets.f0);
mergeMap.put(handleWithOffsets.f0, operatorStateHandle);
}
operatorStateHandle.getStateNameToPartitionOffsets().put(
@@ -229,7 +230,9 @@ public class RoundRobinOperatorStateRepartitioner
implements OperatorStateRepart
for (Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) {
OperatorStateHandle operatorStateHandle
= mergeMap.get(handleWithMetaInfo.f0);
if (operatorStateHandle == null) {
- operatorStateHandle = new
OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+ operatorStateHandle = new
OperatorStreamStateHandle(
+ new
HashMap<>(broadcastNameToState.size()),
+ handleWithMetaInfo.f0);
mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
}
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(),
handleWithMetaInfo.f1);
@@ -256,7 +259,9 @@ public class RoundRobinOperatorStateRepartitioner
implements OperatorStateRepart
OperatorStateHandle operatorStateHandle =
mergeMap.get(handleWithMetaInfo.f0);
if (operatorStateHandle == null) {
- operatorStateHandle = new
OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+ operatorStateHandle = new
OperatorStreamStateHandle(
+ new
HashMap<>(uniformBroadcastNameToState.size()),
+ handleWithMetaInfo.f0);
mergeMap.put(handleWithMetaInfo.f0,
operatorStateHandle);
}
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(),
handleWithMetaInfo.f1);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 592489f..b017388 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -24,11 +24,11 @@ import
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -83,7 +83,7 @@ public class StateAssignmentOperation {
// find the states of all operators belonging to this
task
List<OperatorID> operatorIDs =
executionJobVertex.getOperatorIDs();
List<OperatorID> altOperatorIDs =
executionJobVertex.getUserDefinedOperatorIDs();
- List<OperatorState> operatorStates = new ArrayList<>();
+ List<OperatorState> operatorStates = new
ArrayList<>(operatorIDs.size());
boolean statelessTask = true;
for (int x = 0; x < operatorIDs.size(); x++) {
OperatorID operatorID = altOperatorIDs.get(x)
== null
@@ -124,7 +124,9 @@ public class StateAssignmentOperation {
executionJobVertex.getMaxParallelism(),
newParallelism);
- /**
+ final int expectedNumberOfSubTasks = newParallelism *
operatorIDs.size();
+
+ /*
* Redistribute ManagedOperatorStates and RawOperatorStates
from old parallelism to new parallelism.
*
* The old ManagedOperatorStates with old parallelism 3:
@@ -143,8 +145,10 @@ public class StateAssignmentOperation {
* op2 state2,0 state2,1 state2,2
state2,3
* op3 state3,0 state3,1 state3,2
state3,3
*/
- Map<OperatorInstanceID, List<OperatorStateHandle>>
newManagedOperatorStates = new HashMap<>();
- Map<OperatorInstanceID, List<OperatorStateHandle>>
newRawOperatorStates = new HashMap<>();
+ Map<OperatorInstanceID, List<OperatorStateHandle>>
newManagedOperatorStates =
+ new HashMap<>(expectedNumberOfSubTasks);
+ Map<OperatorInstanceID, List<OperatorStateHandle>>
newRawOperatorStates =
+ new HashMap<>(expectedNumberOfSubTasks);
reDistributePartitionableStates(
operatorStates,
@@ -153,8 +157,10 @@ public class StateAssignmentOperation {
newManagedOperatorStates,
newRawOperatorStates);
- Map<OperatorInstanceID, List<KeyedStateHandle>>
newManagedKeyedState = new HashMap<>();
- Map<OperatorInstanceID, List<KeyedStateHandle>>
newRawKeyedState = new HashMap<>();
+ Map<OperatorInstanceID, List<KeyedStateHandle>>
newManagedKeyedState =
+ new HashMap<>(expectedNumberOfSubTasks);
+ Map<OperatorInstanceID, List<KeyedStateHandle>>
newRawKeyedState =
+ new HashMap<>(expectedNumberOfSubTasks);
reDistributeKeyedStates(
operatorStates,
@@ -164,7 +170,7 @@ public class StateAssignmentOperation {
newManagedKeyedState,
newRawKeyedState);
- /**
+ /*
* An executionJobVertex's all state handles needed to restore
are something like a matrix
*
* parallelism0 parallelism1 parallelism2
parallelism3
@@ -198,7 +204,7 @@ public class StateAssignmentOperation {
Execution currentExecutionAttempt =
executionJobVertex.getTaskVertices()[subTaskIndex]
.getCurrentExecutionAttempt();
- TaskStateSnapshot taskState = new TaskStateSnapshot();
+ TaskStateSnapshot taskState = new
TaskStateSnapshot(operatorIDs.size());
boolean statelessTask = true;
for (OperatorID operatorID : operatorIDs) {
@@ -276,38 +282,34 @@ public class StateAssignmentOperation {
for (int subTaskIndex = 0; subTaskIndex <
newParallelism; subTaskIndex++) {
OperatorInstanceID instanceID =
OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
if (isHeadOperator(operatorIndex,
newOperatorIDs)) {
- Tuple2<Collection<KeyedStateHandle>,
Collection<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
+ Tuple2<List<KeyedStateHandle>,
List<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
operatorState,
newKeyGroupPartitions,
subTaskIndex,
newParallelism,
oldParallelism);
- newManagedKeyedState
- .computeIfAbsent(instanceID,
key -> new ArrayList<>())
- .addAll(subKeyedStates.f0);
- newRawKeyedState
- .computeIfAbsent(instanceID,
key -> new ArrayList<>())
- .addAll(subKeyedStates.f1);
+ newManagedKeyedState.put(instanceID,
subKeyedStates.f0);
+ newRawKeyedState.put(instanceID,
subKeyedStates.f1);
}
}
}
}
// TODO rewrite based on operator id
- private Tuple2<Collection<KeyedStateHandle>,
Collection<KeyedStateHandle>> reAssignSubKeyedStates(
+ private Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>>
reAssignSubKeyedStates(
OperatorState operatorState,
List<KeyGroupRange> keyGroupPartitions,
int subTaskIndex,
int newParallelism,
int oldParallelism) {
- Collection<KeyedStateHandle> subManagedKeyedState;
- Collection<KeyedStateHandle> subRawKeyedState;
+ List<KeyedStateHandle> subManagedKeyedState;
+ List<KeyedStateHandle> subRawKeyedState;
if (newParallelism == oldParallelism) {
if (operatorState.getState(subTaskIndex) != null) {
- subManagedKeyedState =
operatorState.getState(subTaskIndex).getManagedKeyedState();
- subRawKeyedState =
operatorState.getState(subTaskIndex).getRawKeyedState();
+ subManagedKeyedState =
operatorState.getState(subTaskIndex).getManagedKeyedState().asList();
+ subRawKeyedState =
operatorState.getState(subTaskIndex).getRawKeyedState().asList();
} else {
subManagedKeyedState = Collections.emptyList();
subRawKeyedState = Collections.emptyList();
@@ -336,8 +338,8 @@ public class StateAssignmentOperation {
"This method still depends on the order of the new and
old operators");
//collect the old partitionable state
- List<List<OperatorStateHandle>> oldManagedOperatorStates = new
ArrayList<>();
- List<List<OperatorStateHandle>> oldRawOperatorStates = new
ArrayList<>();
+ List<List<OperatorStateHandle>> oldManagedOperatorStates = new
ArrayList<>(oldOperatorStates.size());
+ List<List<OperatorStateHandle>> oldRawOperatorStates = new
ArrayList<>(oldOperatorStates.size());
collectPartionableStates(oldOperatorStates,
oldManagedOperatorStates, oldRawOperatorStates);
@@ -368,24 +370,29 @@ public class StateAssignmentOperation {
List<List<OperatorStateHandle>> rawOperatorStates) {
for (OperatorState operatorState : operatorStates) {
+
+ final int parallelism = operatorState.getParallelism();
+
List<OperatorStateHandle> managedOperatorState = null;
List<OperatorStateHandle> rawOperatorState = null;
- for (int i = 0; i < operatorState.getParallelism();
i++) {
+ for (int i = 0; i < parallelism; i++) {
OperatorSubtaskState operatorSubtaskState =
operatorState.getState(i);
if (operatorSubtaskState != null) {
+
StateObjectCollection<OperatorStateHandle> managed =
operatorSubtaskState.getManagedOperatorState();
+
StateObjectCollection<OperatorStateHandle> raw =
operatorSubtaskState.getRawOperatorState();
+
if (managedOperatorState == null) {
- managedOperatorState = new
ArrayList<>();
+ managedOperatorState = new
ArrayList<>(parallelism * managed.size());
}
-
managedOperatorState.addAll(operatorSubtaskState.getManagedOperatorState());
+ managedOperatorState.addAll(managed);
if (rawOperatorState == null) {
- rawOperatorState = new
ArrayList<>();
+ rawOperatorState = new
ArrayList<>(parallelism * raw.size());
}
-
rawOperatorState.addAll(operatorSubtaskState.getRawOperatorState());
+ rawOperatorState.addAll(raw);
}
-
}
managedOperatorStates.add(managedOperatorState);
rawOperatorStates.add(rawOperatorState);
@@ -404,12 +411,19 @@ public class StateAssignmentOperation {
OperatorState operatorState,
KeyGroupRange subtaskKeyGroupRange) {
- List<KeyedStateHandle> subtaskKeyedStateHandles = new
ArrayList<>();
+ final int parallelism = operatorState.getParallelism();
- for (int i = 0; i < operatorState.getParallelism(); i++) {
+ List<KeyedStateHandle> subtaskKeyedStateHandles = null;
+
+ for (int i = 0; i < parallelism; i++) {
if (operatorState.getState(i) != null) {
Collection<KeyedStateHandle> keyedStateHandles
= operatorState.getState(i).getManagedKeyedState();
+
+ if (subtaskKeyedStateHandles == null) {
+ subtaskKeyedStateHandles = new
ArrayList<>(parallelism * keyedStateHandles.size());
+ }
+
extractIntersectingState(
keyedStateHandles,
subtaskKeyGroupRange,
@@ -432,11 +446,19 @@ public class StateAssignmentOperation {
OperatorState operatorState,
KeyGroupRange subtaskKeyGroupRange) {
- List<KeyedStateHandle> extractedKeyedStateHandles = new
ArrayList<>();
+ final int parallelism = operatorState.getParallelism();
- for (int i = 0; i < operatorState.getParallelism(); i++) {
+ List<KeyedStateHandle> extractedKeyedStateHandles = null;
+
+ for (int i = 0; i < parallelism; i++) {
if (operatorState.getState(i) != null) {
+
Collection<KeyedStateHandle> rawKeyedState =
operatorState.getState(i).getRawKeyedState();
+
+ if (extractedKeyedStateHandles == null) {
+ extractedKeyedStateHandles = new
ArrayList<>(parallelism * rawKeyedState.size());
+ }
+
extractIntersectingState(
rawKeyedState,
subtaskKeyGroupRange,
@@ -565,19 +587,18 @@ public class StateAssignmentOperation {
List<OperatorStateHandle> chainOpParallelStates,
int oldParallelism,
int newParallelism) {
- Map<OperatorInstanceID, List<OperatorStateHandle>> result = new
HashMap<>();
- List<Collection<OperatorStateHandle>> states =
applyRepartitioner(
+ List<List<OperatorStateHandle>> states = applyRepartitioner(
opStateRepartitioner,
chainOpParallelStates,
oldParallelism,
newParallelism);
+ Map<OperatorInstanceID, List<OperatorStateHandle>> result = new
HashMap<>(states.size());
+
for (int subtaskIndex = 0; subtaskIndex < states.size();
subtaskIndex++) {
checkNotNull(states.get(subtaskIndex) != null,
"states.get(subtaskIndex) is null");
- result
-
.computeIfAbsent(OperatorInstanceID.of(subtaskIndex, operatorID), key -> new
ArrayList<>())
- .addAll(states.get(subtaskIndex));
+ result.put(OperatorInstanceID.of(subtaskIndex,
operatorID), states.get(subtaskIndex));
}
return result;
@@ -594,7 +615,7 @@ public class StateAssignmentOperation {
* @return repartitioned state
*/
// TODO rewrite based on operator id
- public static List<Collection<OperatorStateHandle>> applyRepartitioner(
+ public static List<List<OperatorStateHandle>> applyRepartitioner(
OperatorStateRepartitioner opStateRepartitioner,
List<OperatorStateHandle> chainOpParallelStates,
int oldParallelism,
@@ -611,7 +632,7 @@ public class StateAssignmentOperation {
chainOpParallelStates,
newParallelism);
} else {
- List<Collection<OperatorStateHandle>> repackStream =
new ArrayList<>(newParallelism);
+ List<List<OperatorStateHandle>> repackStream = new
ArrayList<>(newParallelism);
for (OperatorStateHandle operatorStateHandle :
chainOpParallelStates) {
if (operatorStateHandle != null) {
@@ -645,7 +666,7 @@ public class StateAssignmentOperation {
Collection<? extends KeyedStateHandle> keyedStateHandles,
KeyGroupRange subtaskKeyGroupRange) {
- List<KeyedStateHandle> subtaskKeyedStateHandles = new
ArrayList<>();
+ List<KeyedStateHandle> subtaskKeyedStateHandles = new
ArrayList<>(keyedStateHandles.size());
for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
KeyedStateHandle intersectedKeyedStateHandle =
keyedStateHandle.getIntersection(subtaskKeyGroupRange);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
index 38e3d15..3076847 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.function.Predicate;
/**
@@ -178,6 +179,14 @@ public class StateObjectCollection<T extends StateObject>
implements Collection<
return "StateObjectCollection{" + stateObjects + '}';
}
+ public List<T> asList() {
+ return stateObjects instanceof List ?
+ (List<T>) stateObjects :
+ stateObjects != null ?
+ new ArrayList<>(stateObjects) :
+ Collections.emptyList();
+ }
+
//
------------------------------------------------------------------------
// Helper methods.
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1b2062a..b113e12 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -53,11 +53,12 @@ import
org.apache.flink.runtime.state.memory.MemoryStateBackend;
import
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -2741,7 +2742,7 @@ public class CheckpointCoordinatorTest extends TestLogger
{
OperatorStateHandle osh = new
OperatorStreamStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new
byte[150]));
OperatorStateRepartitioner repartitioner =
RoundRobinOperatorStateRepartitioner.INSTANCE;
- List<Collection<OperatorStateHandle>> repartitionedStates =
+ List<List<OperatorStateHandle>> repartitionedStates =
repartitioner.repartitionState(Collections.singletonList(osh), 3);
Map<String, Integer> checkCounts = new HashMap<>(3);
@@ -3331,7 +3332,7 @@ public class CheckpointCoordinatorTest extends TestLogger
{
OperatorStateRepartitioner repartitioner =
RoundRobinOperatorStateRepartitioner.INSTANCE;
- List<Collection<OperatorStateHandle>> pshs =
+ List<List<OperatorStateHandle>> pshs =
repartitioner.repartitionState(previousParallelOpInstanceStates,
newParallelism);
Map<StreamStateHandle, Map<String, List<Long>>> actual = new
HashMap<>();