[FLINK-5499] [JobManager] Make the location preferences combined by state and 
inputs.

Reusing the prior location (for state locality) takes precedence over input 
locality.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9ed4ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9ed4ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9ed4ff1

Branch: refs/heads/master
Commit: b9ed4ff151c5d3a64be395c660160b5619e32c7f
Parents: fe4fe58
Author: Stephan Ewen <[email protected]>
Authored: Tue Jan 31 20:34:33 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 3 12:46:14 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java |  83 +++++--
 .../apache/flink/runtime/instance/SlotPool.java |   6 +-
 .../instance/SlotSharingGroupAssignment.java    |   6 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |   4 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   2 +-
 .../ExecutionVertexLocalityTest.java            | 244 +++++++++++++++++++
 .../scheduler/SchedulerTestUtils.java           |   6 +-
 7 files changed, 323 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0bb3514..cb2e177 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -264,20 +265,21 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        }
 
        /**
-        * Just return the last assigned resource location if found
-        *
-        * @return The collection of TaskManagerLocation
+        * Gets the location where the latest completed/canceled/failed 
execution of the vertex's
+        * task happened.
+        * 
+        * @return The latest prior execution location, or null, if there is 
none, yet.
         */
-       public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
-               List<TaskManagerLocation> list = new ArrayList<>();
-               for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
-                       Execution prior = priorExecutions.get(i) ;
-                       if (prior.getAssignedResourceLocation() != null) {
-                               list.add(prior.getAssignedResourceLocation());
-                               break;
+       public TaskManagerLocation getLatestPriorLocation() {
+               synchronized (priorExecutions) {
+                       final int size = priorExecutions.size();
+                       if (size > 0) {
+                               return priorExecutions.get(size - 
1).getAssignedResourceLocation();
+                       }
+                       else {
+                               return null;
                        }
                }
-               return list;
        }
 
        EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
@@ -398,14 +400,61 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        }
 
        /**
-        * Gets the location preferences of this task, determined by the 
locations of the predecessors from which
-        * it receives input data.
+        * Gets the overall preferred execution location for this vertex's 
current execution.
+        * The preference is determined as follows:
+        * 
+        * <ol>
+        *     <li>If the task execution has state to load (from a checkpoint), 
then the location preference
+        *         is the location of the previous execution (if there is a 
previous execution attempt).
+        *     <li>If the task execution has no state or no previous location, 
then the location preference
+        *         is based on the task's inputs.
+        * </ol>
+        * 
+        * These rules should result in the following behavior:
+        * 
+        * <ul>
+        *     <li>Stateless tasks are always scheduled based on co-location 
with inputs.
+        *     <li>Stateful tasks are on their initial attempt executed based 
on co-location with inputs.
+        *     <li>Repeated executions of stateful tasks try to co-locate the 
execution with its state.
+        * </ul>
+        * 
+        * @return The preferred excution locations for the execution attempt.
+        * 
+        * @see #getPreferredLocationsBasedOnState()
+        * @see #getPreferredLocationsBasedOnInputs() 
+        */
+       public Iterable<TaskManagerLocation> getPreferredLocations() {
+               Iterable<TaskManagerLocation> basedOnState = 
getPreferredLocationsBasedOnState();
+               return basedOnState != null ? basedOnState : 
getPreferredLocationsBasedOnInputs();
+       }
+       
+       /**
+        * Gets the preferred location to execute the current task execution 
attempt, based on the state
+        * that the execution attempt will resume.
+        * 
+        * @return A size-one iterable with the location preference, or null, 
if there is no
+        *         location preference based on the state.
+        */
+       public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnState() {
+               TaskManagerLocation priorLocation;
+               if (currentExecution.getTaskStateHandles() != null && 
(priorLocation = getLatestPriorLocation()) != null) {
+                       return Collections.singleton(priorLocation);
+               }
+               else {
+                       return null;
+               }
+       }
+
+       /**
+        * Gets the location preferences of the vertex's current task 
execution, as determined by the locations
+        * of the predecessors from which it receives input data.
         * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different 
locations of source data, this
         * method returns {@code null} to indicate no location preference.
         *
-        * @return The preferred locations for this vertex execution, or null, 
if there is no preference.
+        * @return The preferred locations based in input streams, or an empty 
iterable,
+        *         if there is no input-based preference.
         */
-       public Iterable<TaskManagerLocation> getPreferredLocations() {
+       public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnInputs() {
                // otherwise, base the preferred locations on the input 
connections
                if (inputEdges == null) {
                        return Collections.emptySet();
@@ -435,7 +484,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                        }
                                }
                                // keep the locations of the input with the 
least preferred locations
-                               if(locations.isEmpty() || // nothing assigned 
yet
+                               if (locations.isEmpty() || // nothing assigned 
yet
                                                (!inputLocations.isEmpty() && 
inputLocations.size() < locations.size())) {
                                        // current input has fewer preferred 
locations
                                        locations.clear();
@@ -443,7 +492,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                }
                        }
 
-                       return locations;
+                       return locations.isEmpty() ? 
Collections.<TaskManagerLocation>emptyList() : locations;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6fac3c8..672431e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> 
{
 
                @Override
                public Future<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
-                       return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN,
-                                       
task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), 
timeout);
+                       Iterable<TaskManagerLocation> locationPreferences = 
+                                       
task.getTaskToExecute().getVertex().getPreferredLocations();
+
+                       return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN, locationPreferences, timeout);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 346cc77..88fbc10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -262,7 +262,7 @@ public class SlotSharingGroupAssignment {
 
        /**
         * Gets a slot suitable for the given task vertex. This method will 
prefer slots that are local
-        * (with respect to {@link ExecutionVertex#getPreferredLocations()}), 
but will return non local
+        * (with respect to {@link 
ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non 
local
         * slots if no local slot is available. The method returns null, when 
this sharing group has
         * no slot is available for the given JobVertexID. 
         *
@@ -271,7 +271,7 @@ public class SlotSharingGroupAssignment {
         * @return A slot to execute the given ExecutionVertex in, or null, if 
none is available.
         */
        public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-               return getSlotForTask(vertex.getJobvertexId(), 
vertex.getPreferredLocations());
+               return getSlotForTask(vertex.getJobvertexId(), 
vertex.getPreferredLocationsBasedOnInputs());
        }
 
        /**
@@ -313,7 +313,7 @@ public class SlotSharingGroupAssignment {
         *         shared slot is available.
         */
        public SimpleSlot getSlotForTask(ExecutionVertex vertex, 
CoLocationConstraint constraint) {
-               return getSlotForTask(constraint, 
vertex.getPreferredLocations());
+               return getSlotForTask(constraint, 
vertex.getPreferredLocationsBasedOnInputs());
        }
        
        SimpleSlot getSlotForTask(CoLocationConstraint constraint, 
Iterable<TaskManagerLocation> locationPreferences) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 466a148..dc82440 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -174,7 +174,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
                final ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
                
-               final Iterable<TaskManagerLocation> preferredLocations = 
vertex.getPreferredLocations();
+               final Iterable<TaskManagerLocation> preferredLocations = 
vertex.getPreferredLocationsBasedOnInputs();
                final boolean forceExternalLocation = false &&
                                                                        
preferredLocations != null && preferredLocations.iterator().hasNext();
        
@@ -240,7 +240,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                                                localOnly = true;
                                        }
                                        else {
-                                               locations = 
vertex.getPreferredLocations();
+                                               locations = 
vertex.getPreferredLocationsBasedOnInputs();
                                                localOnly = 
forceExternalLocation;
                                        }
                                        

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 269a8f3..4910862 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -65,7 +65,7 @@ public class AllocatedSlot {
                        JobID jobID,
                        TaskManagerLocation location,
                        int slotNumber,
-                       ResourceProfile resourceProfile,                
+                       ResourceProfile resourceProfile,
                        TaskManagerGateway taskManagerGateway) {
                this.slotAllocationId = checkNotNull(slotAllocationId);
                this.jobID = checkNotNull(jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
new file mode 100644
index 0000000..36b7575
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the execution vertex handles locality preferences well.
+ */
+public class ExecutionVertexLocalityTest extends TestLogger {
+
+       private final JobID jobId = new JobID();
+
+       private final JobVertexID sourceVertexId = new JobVertexID();
+       private final JobVertexID targetVertexId = new JobVertexID();
+
+       /**
+        * This test validates that vertices that have only one input stream 
try to
+        * co-locate their tasks with the producer.
+        */
+       @Test
+       public void testLocalityInputBasedForward() throws Exception {
+               final int parallelism = 10;
+               final TaskManagerLocation[] locations = new 
TaskManagerLocation[parallelism];
+
+               final ExecutionGraph graph = createTestGraph(parallelism, 
false);
+
+               // set the location for all sources to a distinct location
+               for (int i = 0; i < parallelism; i++) {
+                       ExecutionVertex source = 
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+
+                       TaskManagerLocation location = new TaskManagerLocation(
+                                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + i);
+
+                       locations[i] = location;
+                       initializeLocation(source, location);
+               }
+
+               // validate that the target vertices have no location preference
+               for (int i = 0; i < parallelism; i++) {
+                       ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+                       Iterator<TaskManagerLocation> preference = 
target.getPreferredLocations().iterator();
+
+                       assertTrue(preference.hasNext());
+                       assertEquals(locations[i], preference.next());
+                       assertFalse(preference.hasNext());
+               }
+       }
+
+       /**
+        * This test validates that vertices with too many input streams do not 
have a location
+        * preference any more.
+        */
+       @Test
+       public void testNoLocalityInputLargeAllToAll() throws Exception {
+               final int parallelism = 100;
+
+               final ExecutionGraph graph = createTestGraph(parallelism, true);
+
+               // set the location for all sources to a distinct location
+               for (int i = 0; i < parallelism; i++) {
+                       ExecutionVertex source = 
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+                       TaskManagerLocation location = new TaskManagerLocation(
+                                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + i);
+                       initializeLocation(source, location);
+               }
+
+               // validate that the target vertices have no location preference
+               for (int i = 0; i < parallelism; i++) {
+                       ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+                       Iterator<TaskManagerLocation> preference = 
target.getPreferredLocations().iterator();
+                       assertFalse(preference.hasNext());
+               }
+       }
+
+       /**
+        * This test validates that stateful vertices schedule based in the 
state's location
+        * (which is the prior execution's location).
+        */
+       @Test
+       public void testLocalityBasedOnState() throws Exception {
+               final int parallelism = 10;
+               final TaskManagerLocation[] locations = new 
TaskManagerLocation[parallelism];
+
+               final ExecutionGraph graph = createTestGraph(parallelism, 
false);
+
+               // set the location for all sources and targets
+               for (int i = 0; i < parallelism; i++) {
+                       ExecutionVertex source = 
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+                       ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+                       TaskManagerLocation randomLocation = new 
TaskManagerLocation(
+                                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + i);
+                       
+                       TaskManagerLocation location = new TaskManagerLocation(
+                                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 20000 + i);
+
+                       locations[i] = location;
+                       initializeLocation(source, randomLocation);
+                       initializeLocation(target, location);
+
+                       setState(source.getCurrentExecutionAttempt(), 
ExecutionState.CANCELED);
+                       setState(target.getCurrentExecutionAttempt(), 
ExecutionState.CANCELED);
+               }
+
+               // mimic a restart: all vertices get re-initialized without 
actually being executed
+               for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) 
{
+                       ejv.resetForNewExecution();
+               }
+
+               // set new location for the sources and some state for the 
targets
+               for (int i = 0; i < parallelism; i++) {
+                       // source location
+                       ExecutionVertex source = 
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+                       TaskManagerLocation randomLocation = new 
TaskManagerLocation(
+                                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 30000 + i);
+                       initializeLocation(source, randomLocation);
+
+                       // target state
+                       ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+                       
target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateHandles.class));
+               }
+
+               // validate that the target vertices have the state's location 
as the location preference
+               for (int i = 0; i < parallelism; i++) {
+                       ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+                       Iterator<TaskManagerLocation> preference = 
target.getPreferredLocations().iterator();
+
+                       assertTrue(preference.hasNext());
+                       assertEquals(locations[i], preference.next());
+                       assertFalse(preference.hasNext());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a simple 2 vertex graph with a parallel source and a 
parallel target.
+        */
+       private ExecutionGraph createTestGraph(int parallelism, boolean 
allToAll) throws Exception {
+
+               JobVertex source = new JobVertex("source", sourceVertexId);
+               source.setParallelism(parallelism);
+               source.setInvokableClass(NoOpInvokable.class);
+
+               JobVertex target = new JobVertex("source", targetVertexId);
+               target.setParallelism(parallelism);
+               target.setInvokableClass(NoOpInvokable.class);
+
+               DistributionPattern connectionPattern = allToAll ? 
DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
+               target.connectNewDataSetAsInput(source, connectionPattern);
+
+               JobGraph testJob = new JobGraph(jobId, "test job", source, 
target);
+
+               return ExecutionGraphBuilder.buildGraph(
+                               null,
+                               testJob,
+                               new Configuration(),
+                               Executors.directExecutor(),
+                               Executors.directExecutor(),
+                               getClass().getClassLoader(),
+                               new StandaloneCheckpointRecoveryFactory(),
+                               Time.of(10, TimeUnit.SECONDS),
+                               new FixedDelayRestartStrategy(10, 0L),
+                               new UnregisteredMetricsGroup(),
+                               1,
+                               log);
+       }
+
+       private void initializeLocation(ExecutionVertex vertex, 
TaskManagerLocation location) throws Exception {
+               // we need a bit of reflection magic to initialize the location 
without going through
+               // scheduling paths. we choose to do that, rather than the 
alternatives:
+               //  - mocking the scheduler created fragile tests that break 
whenever the scheduler is adjusted
+               //  - exposing test methods in the ExecutionVertex leads to 
undesirable setters 
+
+               AllocatedSlot slot = new AllocatedSlot(
+                               new AllocationID(), jobId, location, 0, 
ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+
+               SimpleSlot simpleSlot = new SimpleSlot(slot, 
mock(SlotOwner.class), 0);
+
+               final Field locationField = 
Execution.class.getDeclaredField("assignedResource");
+               locationField.setAccessible(true);
+
+               locationField.set(vertex.getCurrentExecutionAttempt(), 
simpleSlot);
+       }
+
+       private void setState(Execution execution, ExecutionState state) throws 
Exception {
+               final Field stateField = 
Execution.class.getDeclaredField("state");
+               stateField.setAccessible(true);
+
+               stateField.set(execution, state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index b36de77..9e692ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -106,7 +106,7 @@ public class SchedulerTestUtils {
        public static Execution getTestVertex(Iterable<TaskManagerLocation> 
preferredLocations) {
                ExecutionVertex vertex = mock(ExecutionVertex.class);
                
-               
when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
+               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
                when(vertex.getJobId()).thenReturn(new JobID());
                when(vertex.toString()).thenReturn("TEST-VERTEX");
                
@@ -119,7 +119,7 @@ public class SchedulerTestUtils {
        public static Execution getTestVertex(JobVertexID jid, int taskIndex, 
int numTasks) {
                ExecutionVertex vertex = mock(ExecutionVertex.class);
                
-               when(vertex.getPreferredLocations()).thenReturn(null);
+               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
                when(vertex.getJobId()).thenReturn(new JobID());
                when(vertex.getJobvertexId()).thenReturn(jid);
                when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +139,7 @@ public class SchedulerTestUtils {
 
                ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-               
when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
+               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
                when(vertex.getJobId()).thenReturn(new JobID());
                when(vertex.getJobvertexId()).thenReturn(jid);
                when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);

Reply via email to