This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 15e137e  [hotfix][state] Improve memory-friendliness of state 
assignment by allocating collections with correct sizes or improved size 
estimates where possible
15e137e is described below

commit 15e137ec64b6d86ea67a65880c6f4ffb2c24afbd
Author: Stefan Richter <[email protected]>
AuthorDate: Wed Aug 8 14:12:22 2018 +0200

    [hotfix][state] Improve memory-friendliness of state assignment by 
allocating collections with correct sizes or improved size estimates where 
possible
---
 .../checkpoint/OperatorStateRepartitioner.java     |   3 +-
 .../RoundRobinOperatorStateRepartitioner.java      |  33 ++++---
 .../checkpoint/StateAssignmentOperation.java       | 103 +++++++++++++--------
 .../runtime/checkpoint/StateObjectCollection.java  |   9 ++
 .../checkpoint/CheckpointCoordinatorTest.java      |   7 +-
 5 files changed, 95 insertions(+), 60 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
index 090f48a..ce2d403 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.OperatorStateHandle;
 
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -36,7 +35,7 @@ public interface OperatorStateRepartitioner {
         * @return List with one entry per parallel subtask. Each subtask 
receives now one collection of states that build
         * of the new total state for this subtask.
         */
-       List<Collection<OperatorStateHandle>> repartitionState(
+       List<List<OperatorStateHandle>> repartitionState(
                        List<OperatorStateHandle> previousParallelSubtaskStates,
                        int newParallelism);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
index e6fa687..4705265 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -26,11 +26,11 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Current default implementation of {@link OperatorStateRepartitioner} that 
redistributes state in round robin fashion.
@@ -41,7 +41,7 @@ public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepart
        private static final boolean OPTIMIZE_MEMORY_USE = false;
 
        @Override
-       public List<Collection<OperatorStateHandle>> repartitionState(
+       public List<List<OperatorStateHandle>> repartitionState(
                        List<OperatorStateHandle> previousParallelSubtaskStates,
                        int newParallelism) {
 
@@ -56,7 +56,7 @@ public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepart
                }
 
                // Assemble result from all merge maps
-               List<Collection<OperatorStateHandle>> result = new 
ArrayList<>(newParallelism);
+               List<List<OperatorStateHandle>> result = new 
ArrayList<>(newParallelism);
 
                // Do the actual repartitioning for all named states
                List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
@@ -93,20 +93,19 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                continue;
                        }
 
-                       for (Map.Entry<String, 
OperatorStateHandle.StateMetaInfo> e :
-                                       
psh.getStateNameToPartitionOffsets().entrySet()) {
+                       final Set<Map.Entry<String, 
OperatorStateHandle.StateMetaInfo>> partitionOffsetEntries =
+                               psh.getStateNameToPartitionOffsets().entrySet();
+
+                       for (Map.Entry<String, 
OperatorStateHandle.StateMetaInfo> e : partitionOffsetEntries) {
                                OperatorStateHandle.StateMetaInfo metaInfo = 
e.getValue();
 
                                Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> nameToState =
                                                
nameToStateByMode.get(metaInfo.getDistributionMode());
 
                                List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>> stateLocations =
-                                               nameToState.get(e.getKey());
-
-                               if (stateLocations == null) {
-                                       stateLocations = new ArrayList<>();
-                                       nameToState.put(e.getKey(), 
stateLocations);
-                               }
+                                       nameToState.computeIfAbsent(
+                                               e.getKey(),
+                                               k -> new 
ArrayList<>(previousParallelSubtaskStates.size() * 
partitionOffsetEntries.size()));
 
                                stateLocations.add(new 
Tuple2<>(psh.getDelegateStateHandle(), e.getValue()));
                        }
@@ -203,7 +202,9 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                        Map<StreamStateHandle, 
OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
                                        OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithOffsets.f0);
                                        if (operatorStateHandle == null) {
-                                               operatorStateHandle = new 
OperatorStreamStateHandle(new HashMap<>(), handleWithOffsets.f0);
+                                               operatorStateHandle = new 
OperatorStreamStateHandle(
+                                                       new 
HashMap<>(distributeNameToState.size()),
+                                                       handleWithOffsets.f0);
                                                
mergeMap.put(handleWithOffsets.f0, operatorStateHandle);
                                        }
                                        
operatorStateHandle.getStateNameToPartitionOffsets().put(
@@ -229,7 +230,9 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                for (Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) {
                                        OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithMetaInfo.f0);
                                        if (operatorStateHandle == null) {
-                                               operatorStateHandle = new 
OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+                                               operatorStateHandle = new 
OperatorStreamStateHandle(
+                                                       new 
HashMap<>(broadcastNameToState.size()),
+                                                       handleWithMetaInfo.f0);
                                                
mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
                                        }
                                        
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
@@ -256,7 +259,9 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
 
                                OperatorStateHandle operatorStateHandle = 
mergeMap.get(handleWithMetaInfo.f0);
                                if (operatorStateHandle == null) {
-                                       operatorStateHandle = new 
OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+                                       operatorStateHandle = new 
OperatorStreamStateHandle(
+                                               new 
HashMap<>(uniformBroadcastNameToState.size()),
+                                               handleWithMetaInfo.f0);
                                        mergeMap.put(handleWithMetaInfo.f0, 
operatorStateHandle);
                                }
                                
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 592489f..b017388 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -24,11 +24,11 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -83,7 +83,7 @@ public class StateAssignmentOperation {
                        // find the states of all operators belonging to this 
task
                        List<OperatorID> operatorIDs = 
executionJobVertex.getOperatorIDs();
                        List<OperatorID> altOperatorIDs = 
executionJobVertex.getUserDefinedOperatorIDs();
-                       List<OperatorState> operatorStates = new ArrayList<>();
+                       List<OperatorState> operatorStates = new 
ArrayList<>(operatorIDs.size());
                        boolean statelessTask = true;
                        for (int x = 0; x < operatorIDs.size(); x++) {
                                OperatorID operatorID = altOperatorIDs.get(x) 
== null
@@ -124,7 +124,9 @@ public class StateAssignmentOperation {
                        executionJobVertex.getMaxParallelism(),
                        newParallelism);
 
-               /**
+               final int expectedNumberOfSubTasks = newParallelism * 
operatorIDs.size();
+
+               /*
                 * Redistribute ManagedOperatorStates and RawOperatorStates 
from old parallelism to new parallelism.
                 *
                 * The old ManagedOperatorStates with old parallelism 3:
@@ -143,8 +145,10 @@ public class StateAssignmentOperation {
                 * op2   state2,0         state2,1         state2,2             
state2,3
                 * op3   state3,0         state3,1         state3,2             
state3,3
                 */
-               Map<OperatorInstanceID, List<OperatorStateHandle>> 
newManagedOperatorStates = new HashMap<>();
-               Map<OperatorInstanceID, List<OperatorStateHandle>> 
newRawOperatorStates = new HashMap<>();
+               Map<OperatorInstanceID, List<OperatorStateHandle>> 
newManagedOperatorStates =
+                       new HashMap<>(expectedNumberOfSubTasks);
+               Map<OperatorInstanceID, List<OperatorStateHandle>> 
newRawOperatorStates =
+                       new HashMap<>(expectedNumberOfSubTasks);
 
                reDistributePartitionableStates(
                        operatorStates,
@@ -153,8 +157,10 @@ public class StateAssignmentOperation {
                        newManagedOperatorStates,
                        newRawOperatorStates);
 
-               Map<OperatorInstanceID, List<KeyedStateHandle>> 
newManagedKeyedState = new HashMap<>();
-               Map<OperatorInstanceID, List<KeyedStateHandle>> 
newRawKeyedState = new HashMap<>();
+               Map<OperatorInstanceID, List<KeyedStateHandle>> 
newManagedKeyedState =
+                       new HashMap<>(expectedNumberOfSubTasks);
+               Map<OperatorInstanceID, List<KeyedStateHandle>> 
newRawKeyedState =
+                       new HashMap<>(expectedNumberOfSubTasks);
 
                reDistributeKeyedStates(
                        operatorStates,
@@ -164,7 +170,7 @@ public class StateAssignmentOperation {
                        newManagedKeyedState,
                        newRawKeyedState);
 
-               /**
+               /*
                 *  An executionJobVertex's all state handles needed to restore 
are something like a matrix
                 *
                 *              parallelism0 parallelism1 parallelism2 
parallelism3
@@ -198,7 +204,7 @@ public class StateAssignmentOperation {
                        Execution currentExecutionAttempt = 
executionJobVertex.getTaskVertices()[subTaskIndex]
                                .getCurrentExecutionAttempt();
 
-                       TaskStateSnapshot taskState = new TaskStateSnapshot();
+                       TaskStateSnapshot taskState = new 
TaskStateSnapshot(operatorIDs.size());
                        boolean statelessTask = true;
 
                        for (OperatorID operatorID : operatorIDs) {
@@ -276,38 +282,34 @@ public class StateAssignmentOperation {
                        for (int subTaskIndex = 0; subTaskIndex < 
newParallelism; subTaskIndex++) {
                                OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
                                if (isHeadOperator(operatorIndex, 
newOperatorIDs)) {
-                                       Tuple2<Collection<KeyedStateHandle>, 
Collection<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
+                                       Tuple2<List<KeyedStateHandle>, 
List<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
                                                operatorState,
                                                newKeyGroupPartitions,
                                                subTaskIndex,
                                                newParallelism,
                                                oldParallelism);
-                                       newManagedKeyedState
-                                               .computeIfAbsent(instanceID, 
key -> new ArrayList<>())
-                                               .addAll(subKeyedStates.f0);
-                                       newRawKeyedState
-                                               .computeIfAbsent(instanceID, 
key -> new ArrayList<>())
-                                               .addAll(subKeyedStates.f1);
+                                       newManagedKeyedState.put(instanceID, 
subKeyedStates.f0);
+                                       newRawKeyedState.put(instanceID, 
subKeyedStates.f1);
                                }
                        }
                }
        }
 
        // TODO rewrite based on operator id
-       private Tuple2<Collection<KeyedStateHandle>, 
Collection<KeyedStateHandle>> reAssignSubKeyedStates(
+       private Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>> 
reAssignSubKeyedStates(
                        OperatorState operatorState,
                        List<KeyGroupRange> keyGroupPartitions,
                        int subTaskIndex,
                        int newParallelism,
                        int oldParallelism) {
 
-               Collection<KeyedStateHandle> subManagedKeyedState;
-               Collection<KeyedStateHandle> subRawKeyedState;
+               List<KeyedStateHandle> subManagedKeyedState;
+               List<KeyedStateHandle> subRawKeyedState;
 
                if (newParallelism == oldParallelism) {
                        if (operatorState.getState(subTaskIndex) != null) {
-                               subManagedKeyedState = 
operatorState.getState(subTaskIndex).getManagedKeyedState();
-                               subRawKeyedState = 
operatorState.getState(subTaskIndex).getRawKeyedState();
+                               subManagedKeyedState = 
operatorState.getState(subTaskIndex).getManagedKeyedState().asList();
+                               subRawKeyedState = 
operatorState.getState(subTaskIndex).getRawKeyedState().asList();
                        } else {
                                subManagedKeyedState = Collections.emptyList();
                                subRawKeyedState = Collections.emptyList();
@@ -336,8 +338,8 @@ public class StateAssignmentOperation {
                        "This method still depends on the order of the new and 
old operators");
 
                //collect the old partitionable state
-               List<List<OperatorStateHandle>> oldManagedOperatorStates = new 
ArrayList<>();
-               List<List<OperatorStateHandle>> oldRawOperatorStates = new 
ArrayList<>();
+               List<List<OperatorStateHandle>> oldManagedOperatorStates = new 
ArrayList<>(oldOperatorStates.size());
+               List<List<OperatorStateHandle>> oldRawOperatorStates = new 
ArrayList<>(oldOperatorStates.size());
 
                collectPartionableStates(oldOperatorStates, 
oldManagedOperatorStates, oldRawOperatorStates);
 
@@ -368,24 +370,29 @@ public class StateAssignmentOperation {
                        List<List<OperatorStateHandle>> rawOperatorStates) {
 
                for (OperatorState operatorState : operatorStates) {
+
+                       final int parallelism = operatorState.getParallelism();
+
                        List<OperatorStateHandle> managedOperatorState = null;
                        List<OperatorStateHandle> rawOperatorState = null;
 
-                       for (int i = 0; i < operatorState.getParallelism(); 
i++) {
+                       for (int i = 0; i < parallelism; i++) {
                                OperatorSubtaskState operatorSubtaskState = 
operatorState.getState(i);
                                if (operatorSubtaskState != null) {
 
+                                       
StateObjectCollection<OperatorStateHandle> managed = 
operatorSubtaskState.getManagedOperatorState();
+                                       
StateObjectCollection<OperatorStateHandle> raw = 
operatorSubtaskState.getRawOperatorState();
+
                                        if (managedOperatorState == null) {
-                                               managedOperatorState = new 
ArrayList<>();
+                                               managedOperatorState = new 
ArrayList<>(parallelism * managed.size());
                                        }
-                                       
managedOperatorState.addAll(operatorSubtaskState.getManagedOperatorState());
+                                       managedOperatorState.addAll(managed);
 
                                        if (rawOperatorState == null) {
-                                               rawOperatorState = new 
ArrayList<>();
+                                               rawOperatorState = new 
ArrayList<>(parallelism * raw.size());
                                        }
-                                       
rawOperatorState.addAll(operatorSubtaskState.getRawOperatorState());
+                                       rawOperatorState.addAll(raw);
                                }
-
                        }
                        managedOperatorStates.add(managedOperatorState);
                        rawOperatorStates.add(rawOperatorState);
@@ -404,12 +411,19 @@ public class StateAssignmentOperation {
                OperatorState operatorState,
                KeyGroupRange subtaskKeyGroupRange) {
 
-               List<KeyedStateHandle> subtaskKeyedStateHandles = new 
ArrayList<>();
+               final int parallelism = operatorState.getParallelism();
 
-               for (int i = 0; i < operatorState.getParallelism(); i++) {
+               List<KeyedStateHandle> subtaskKeyedStateHandles = null;
+
+               for (int i = 0; i < parallelism; i++) {
                        if (operatorState.getState(i) != null) {
 
                                Collection<KeyedStateHandle> keyedStateHandles 
= operatorState.getState(i).getManagedKeyedState();
+
+                               if (subtaskKeyedStateHandles == null) {
+                                       subtaskKeyedStateHandles = new 
ArrayList<>(parallelism * keyedStateHandles.size());
+                               }
+
                                extractIntersectingState(
                                        keyedStateHandles,
                                        subtaskKeyGroupRange,
@@ -432,11 +446,19 @@ public class StateAssignmentOperation {
                OperatorState operatorState,
                KeyGroupRange subtaskKeyGroupRange) {
 
-               List<KeyedStateHandle> extractedKeyedStateHandles = new 
ArrayList<>();
+               final int parallelism = operatorState.getParallelism();
 
-               for (int i = 0; i < operatorState.getParallelism(); i++) {
+               List<KeyedStateHandle> extractedKeyedStateHandles = null;
+
+               for (int i = 0; i < parallelism; i++) {
                        if (operatorState.getState(i) != null) {
+
                                Collection<KeyedStateHandle> rawKeyedState = 
operatorState.getState(i).getRawKeyedState();
+
+                               if (extractedKeyedStateHandles == null) {
+                                       extractedKeyedStateHandles = new 
ArrayList<>(parallelism * rawKeyedState.size());
+                               }
+
                                extractIntersectingState(
                                        rawKeyedState,
                                        subtaskKeyGroupRange,
@@ -565,19 +587,18 @@ public class StateAssignmentOperation {
                        List<OperatorStateHandle> chainOpParallelStates,
                        int oldParallelism,
                        int newParallelism) {
-               Map<OperatorInstanceID, List<OperatorStateHandle>> result = new 
HashMap<>();
 
-               List<Collection<OperatorStateHandle>> states = 
applyRepartitioner(
+               List<List<OperatorStateHandle>> states = applyRepartitioner(
                        opStateRepartitioner,
                        chainOpParallelStates,
                        oldParallelism,
                        newParallelism);
 
+               Map<OperatorInstanceID, List<OperatorStateHandle>> result = new 
HashMap<>(states.size());
+
                for (int subtaskIndex = 0; subtaskIndex < states.size(); 
subtaskIndex++) {
                        checkNotNull(states.get(subtaskIndex) != null, 
"states.get(subtaskIndex) is null");
-                       result
-                               
.computeIfAbsent(OperatorInstanceID.of(subtaskIndex, operatorID), key -> new 
ArrayList<>())
-                               .addAll(states.get(subtaskIndex));
+                       result.put(OperatorInstanceID.of(subtaskIndex, 
operatorID), states.get(subtaskIndex));
                }
 
                return result;
@@ -594,7 +615,7 @@ public class StateAssignmentOperation {
         * @return repartitioned state
         */
        // TODO rewrite based on operator id
-       public static List<Collection<OperatorStateHandle>> applyRepartitioner(
+       public static List<List<OperatorStateHandle>> applyRepartitioner(
                        OperatorStateRepartitioner opStateRepartitioner,
                        List<OperatorStateHandle> chainOpParallelStates,
                        int oldParallelism,
@@ -611,7 +632,7 @@ public class StateAssignmentOperation {
                                        chainOpParallelStates,
                                        newParallelism);
                } else {
-                       List<Collection<OperatorStateHandle>> repackStream = 
new ArrayList<>(newParallelism);
+                       List<List<OperatorStateHandle>> repackStream = new 
ArrayList<>(newParallelism);
                        for (OperatorStateHandle operatorStateHandle : 
chainOpParallelStates) {
 
                                if (operatorStateHandle != null) {
@@ -645,7 +666,7 @@ public class StateAssignmentOperation {
                Collection<? extends KeyedStateHandle> keyedStateHandles,
                KeyGroupRange subtaskKeyGroupRange) {
 
-               List<KeyedStateHandle> subtaskKeyedStateHandles = new 
ArrayList<>();
+               List<KeyedStateHandle> subtaskKeyedStateHandles = new 
ArrayList<>(keyedStateHandles.size());
 
                for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                        KeyedStateHandle intersectedKeyedStateHandle = 
keyedStateHandle.getIntersection(subtaskKeyGroupRange);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
index 38e3d15..3076847 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.function.Predicate;
 
 /**
@@ -178,6 +179,14 @@ public class StateObjectCollection<T extends StateObject> 
implements Collection<
                return "StateObjectCollection{" + stateObjects + '}';
        }
 
+       public List<T> asList() {
+               return stateObjects instanceof List ?
+                       (List<T>) stateObjects :
+                       stateObjects != null ?
+                               new ArrayList<>(stateObjects) :
+                               Collections.emptyList();
+       }
+
        // 
------------------------------------------------------------------------
        //  Helper methods.
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1b2062a..b113e12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -53,11 +53,12 @@ import 
org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -2741,7 +2742,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                OperatorStateHandle osh = new 
OperatorStreamStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new 
byte[150]));
 
                OperatorStateRepartitioner repartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
-               List<Collection<OperatorStateHandle>> repartitionedStates =
+               List<List<OperatorStateHandle>> repartitionedStates =
                                
repartitioner.repartitionState(Collections.singletonList(osh), 3);
 
                Map<String, Integer> checkCounts = new HashMap<>(3);
@@ -3331,7 +3332,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                OperatorStateRepartitioner repartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
 
-               List<Collection<OperatorStateHandle>> pshs =
+               List<List<OperatorStateHandle>> pshs =
                                
repartitioner.repartitionState(previousParallelOpInstanceStates, 
newParallelism);
 
                Map<StreamStateHandle, Map<String, List<Long>>> actual = new 
HashMap<>();

Reply via email to