[FLINK-1478] [jobmanager] Scheduler support for external location constraints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970b2b7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970b2b7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970b2b7a

Branch: refs/heads/master
Commit: 970b2b7af85f62f525e697dfa1520e88c8fc5e51
Parents: a9ac7aa
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 5 18:28:12 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 5 18:50:40 2015 +0100

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      |   9 +
 .../runtime/executiongraph/ExecutionVertex.java |  57 ++-
 .../runtime/jobmanager/scheduler/Scheduler.java | 119 +++--
 .../scheduler/SlotSharingGroupAssignment.java   |   7 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   5 +-
 .../ExecutionVertexDeploymentTest.java          |  12 +-
 .../VertexLocationConstraintTest.java           | 436 +++++++++++++++++++
 7 files changed, 586 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6676e85..0439c08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -259,6 +259,15 @@ public class ExecutionJobVertex implements Serializable {
        
//---------------------------------------------------------------------------------------------
        
        public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {
+               
+//             ExecutionVertex[] vertices = this.taskVertices;
+//             
+//             for (int i = 0; i < vertices.length; i++) {
+//                     ExecutionVertex v = vertices[i];
+//                     
+//                     if (v.get 
+//             }
+               
                for (ExecutionVertex ev : getTaskVertices()) {
                        ev.scheduleForExecution(scheduler, queued);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 05cea73..86173da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,6 +43,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -82,6 +83,11 @@ public class ExecutionVertex implements Serializable {
        
        private volatile Execution currentExecution;    // this field must 
never be null
        
+       
+       private volatile List<Instance> locationConstraintInstances;
+       
+       private volatile boolean scheduleLocalOnly;
+       
        // 
--------------------------------------------------------------------------------------------
 
        public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -294,10 +300,22 @@ public class ExecutionVertex implements Serializable {
                }
        }
        
-       public void setTargetHostConstraint(String hostname) {
+       public void setLocationConstraintHosts(List<Instance> instances) {
+               this.locationConstraintInstances = instances;
+       }
+       
+       public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
+               if (scheduleLocalOnly && inputEdges != null && 
inputEdges.length > 0) {
+                       throw new IllegalArgumentException("Strictly local 
scheduling is only supported for sources.");
+               }
                
+               this.scheduleLocalOnly = scheduleLocalOnly;
        }
 
+       public boolean isScheduleLocalOnly() {
+               return scheduleLocalOnly;
+       }
+       
        /**
         * Gets the location preferences of this task, determined by the 
locations of the predecessors from which
         * it receives input data.
@@ -307,23 +325,37 @@ public class ExecutionVertex implements Serializable {
         * @return The preferred locations for this vertex execution, or null, 
if there is no preference.
         */
        public Iterable<Instance> getPreferredLocations() {
-               HashSet<Instance> locations = new HashSet<Instance>();
+               // if we have hard location constraints, use those
+               {
+                       List<Instance> constraintInstances = 
this.locationConstraintInstances;
+                       if (constraintInstances != null && 
!constraintInstances.isEmpty()) {
+                               return constraintInstances;
+                       }
+               }
+               
+               // otherwise, base the preferred locations on the input 
connections
+               if (inputEdges == null) {
+                       return Collections.emptySet();
+               }
+               else {
+                       HashSet<Instance> locations = new HashSet<Instance>();
                
-               for (int i = 0; i < inputEdges.length; i++) {
-                       ExecutionEdge[] sources = inputEdges[i];
-                       if (sources != null) {
-                               for (int k = 0; k < sources.length; k++) {
-                                       SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
-                                       if (sourceSlot != null) {
-                                               
locations.add(sourceSlot.getInstance());
-                                               if (locations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-                                                       return null;
+                       for (int i = 0; i < inputEdges.length; i++) {
+                               ExecutionEdge[] sources = inputEdges[i];
+                               if (sources != null) {
+                                       for (int k = 0; k < sources.length; 
k++) {
+                                               SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
+                                               if (sourceSlot != null) {
+                                                       
locations.add(sourceSlot.getInstance());
+                                                       if (locations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+                                                               return null;
+                                                       }
                                                }
                                        }
                                }
                        }
+                       return locations;
                }
-               return locations;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -407,6 +439,7 @@ public class ExecutionVertex implements Serializable {
                // clear the unnecessary fields in this class
                this.resultPartitions = null;
                this.inputEdges = null;
+               this.locationConstraintInstances = null;
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 1ad7a50..466e0e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import akka.dispatch.Futures;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -72,6 +73,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        
        private int nonLocalizedAssignments;
        
+       
        public Scheduler() {
                this.newlyAvailableInstances = new 
LinkedBlockingQueue<Instance>();
        }
@@ -164,6 +166,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                }
 
                final ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
+               
+               final Iterable<Instance> preferredLocations = 
vertex.getPreferredLocations();
+               final boolean forceExternalLocation = 
vertex.isScheduleLocalOnly() &&
+                                                                       
preferredLocations != null && preferredLocations.iterator().hasNext();
        
                synchronized (globalLock) {
                
@@ -179,6 +185,12 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                final SlotSharingGroupAssignment assignment = 
sharingUnit.getTaskAssignment();
                                final CoLocationConstraint constraint = 
task.getLocationConstraint();
                                
+                               // sanity check that we do not use an 
externally forced location and a co-location constraint together
+                               if (constraint != null && 
forceExternalLocation) {
+                                       throw new IllegalArgumentException("The 
scheduling cannot be contrained simultaneously by a "
+                                                       + "co-location 
constriaint and an external location constraint.");
+                               }
+                               
                                // get a slot from the group, if the group has 
one for us (and can fulfill the constraint)
                                SimpleSlot slotFromGroup;
                                if (constraint == null) {
@@ -206,7 +218,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                                        
vertex.getPreferredLocations() : 
Collections.singleton(constraint.getLocation());
                                        
                                        // get a new slot, since we could not 
place it into the group, or we could not place it locally
-                                       newSlot = getFreeSubSlotForTask(vertex, 
locations, assignment, constraint);
+                                       newSlot = getFreeSubSlotForTask(vertex, 
locations, assignment, constraint, forceExternalLocation);
 
                                        SimpleSlot toUse;
                                        
@@ -214,8 +226,20 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                                if (slotFromGroup == null) {
                                                        // both null
                                                        if (constraint == null 
|| constraint.isUnassigned()) {
-                                                               throw new 
NoResourceAvailableException(task, getNumberOfAvailableInstances(), 
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
-                                                       } else {
+                                                               if 
(forceExternalLocation) {
+                                                                       // 
could not satisfy the external location constraint
+                                                                       String 
hosts = getHostnamesFromInstances(preferredLocations);
+                                                                       throw 
new NoResourceAvailableException("Could not schedule task " + vertex
+                                                                               
        + " to any of the required hosts: " + hosts);
+                                                               }
+                                                               else {
+                                                                       // 
simply nothing is available
+                                                                       throw 
new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
+                                                                               
        getTotalNumberOfSlots(), getNumberOfAvailableSlots());
+                                                               }
+                                                       }
+                                                       else {
+                                                               // nothing is 
available on the node where the co-location constraint pushes us
                                                                throw new 
NoResourceAvailableException("Could not allocate a slot on instance " + 
                                                                                
        constraint.getLocation() + ", as required by the co-location 
constraint.");
                                                        }
@@ -269,26 +293,49 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                        }
                
                        // 2) === schedule without hints and sharing ===
-                       
-                       SimpleSlot slot = getFreeSlotForTask(vertex, 
vertex.getPreferredLocations());
-                       if (slot != null) {
-                               updateLocalityCounters(slot.getLocality());
-                               return slot;
-                       }
-                       else {
-                               // no resource available now, so queue the 
request
-                               if (queueIfNoResource) {
-                                       SlotAllocationFuture future = new 
SlotAllocationFuture();
-                                       this.taskQueue.add(new QueuedTask(task, 
future));
-                                       return future;
+                       {
+                               SimpleSlot slot = getFreeSlotForTask(vertex, 
preferredLocations, forceExternalLocation);
+                               if (slot != null) {
+                                       
updateLocalityCounters(slot.getLocality());
+                                       return slot;
                                }
                                else {
-                                       throw new 
NoResourceAvailableException(getNumberOfAvailableInstances(), 
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
+                                       // no resource available now, so queue 
the request
+                                       if (queueIfNoResource) {
+                                               SlotAllocationFuture future = 
new SlotAllocationFuture();
+                                               this.taskQueue.add(new 
QueuedTask(task, future));
+                                               return future;
+                                       }
+                                       else if (forceExternalLocation) {
+                                               String hosts = 
getHostnamesFromInstances(preferredLocations);
+                                               throw new 
NoResourceAvailableException("Could not schedule task " + vertex
+                                                               + " to any of 
the required hosts: " + hosts);
+                                       }
+                                       else {
+                                               throw new 
NoResourceAvailableException(getNumberOfAvailableInstances(), 
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
+                                       }
                                }
                        }
                }
        }
+       
+       private String getHostnamesFromInstances(Iterable<Instance> instances) {
+               StringBuilder bld = new StringBuilder();
+               
+               for (Instance i : instances) {
+                       bld.append(i.getInstanceConnectionInfo().getHostname());
+                       bld.append(", ");
+               }
                
+               if (bld.length() == 0) {
+                       return "";
+               }
+               else {
+                       bld.setLength(bld.length() - 2);
+                       return bld.toString();
+               }
+       }
+       
        /**
         * Gets a suitable instance to schedule the vertex execution to.
         * <p>
@@ -297,21 +344,21 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         * @param vertex The task to run. 
         * @return The instance to run the vertex on, it {@code null}, if no 
instance is available.
         */
-       protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, 
Iterable<Instance> requestedLocations) {
+       protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, 
Iterable<Instance> requestedLocations, boolean localOnly) {
                
                // we need potentially to loop multiple times, because there 
may be false positives
                // in the set-with-available-instances
                while (true) {
-                       Pair<Instance, Locality> instanceLocalityPair = 
findInstance(requestedLocations);
+                       Pair<Instance, Locality> instanceLocalityPair = 
findInstance(requestedLocations, localOnly);
 
-                       if(instanceLocalityPair == null){
+                       if (instanceLocalityPair == null){
                                return null;
                        }
 
                        Instance instanceToUse = instanceLocalityPair.getLeft();
                        Locality locality = instanceLocalityPair.getRight();
 
-                       if(LOG.isDebugEnabled()){
+                       if (LOG.isDebugEnabled()){
                                if(locality == Locality.LOCAL){
                                        LOG.debug("Local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
                                }else if(locality == Locality.NON_LOCAL){
@@ -348,25 +395,26 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
                                                                                
        Iterable<Instance> requestedLocations,
                                                                                
        SlotSharingGroupAssignment groupAssignment,
-                                                                               
        CoLocationConstraint constraint) {
+                                                                               
        CoLocationConstraint constraint,
+                                                                               
        boolean localOnly) {
                // we need potentially to loop multiple times, because there 
may be false positives
                // in the set-with-available-instances
                while (true) {
-                       Pair<Instance, Locality> instanceLocalityPair = 
findInstance(requestedLocations);
+                       Pair<Instance, Locality> instanceLocalityPair = 
findInstance(requestedLocations, localOnly);
 
-                       if(instanceLocalityPair == null){
+                       if (instanceLocalityPair == null) {
                                return null;
                        }
 
                        Instance instanceToUse = instanceLocalityPair.getLeft();
                        Locality locality = instanceLocalityPair.getRight();
 
-                       if(LOG.isDebugEnabled()){
-                               if(locality == Locality.LOCAL){
+                       if (LOG.isDebugEnabled()) {
+                               if (locality == Locality.LOCAL) {
                                        LOG.debug("Local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
-                               }else if(locality == Locality.NON_LOCAL){
+                               } else if(locality == Locality.NON_LOCAL) {
                                        LOG.debug("Non-local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
-                               }else if(locality == Locality.UNCONSTRAINED) {
+                               } else if(locality == Locality.UNCONSTRAINED) {
                                        LOG.debug("Unconstrained assignment: " 
+ vertex.getSimpleName() + " --> " + instanceToUse);
                                }
                        }
@@ -409,7 +457,8 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         *
         * @param requestedLocations
         */
-       private Pair<Instance, Locality> findInstance(Iterable<Instance> 
requestedLocations){
+       private Pair<Instance, Locality> findInstance(Iterable<Instance> 
requestedLocations, boolean localOnly){
+               
                if (this.instancesWithAvailableResources.isEmpty()) {
                        // check if the asynchronous calls did not yet return 
the queues
                        Instance queuedInstance = 
this.newlyAvailableInstances.poll();
@@ -434,14 +483,18 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                if (location != null && 
this.instancesWithAvailableResources.remove(location)) {
                                        instanceToUse = location;
                                        locality = Locality.LOCAL;
-
                                        break;
                                }
                        }
 
                        if (instanceToUse == null) {
-                               instanceToUse = 
this.instancesWithAvailableResources.poll();
-                               locality = Locality.NON_LOCAL;
+                               if (localOnly) {
+                                       return null;
+                               }
+                               else {
+                                       instanceToUse = 
this.instancesWithAvailableResources.poll();
+                                       locality = Locality.NON_LOCAL;
+                               }
                        }
                }
                else {
@@ -603,8 +656,8 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        public int getNumberOfAvailableInstances() {
                int numberAvailableInstances = 0;
                synchronized (this.globalLock) {
-                       for(Instance instance: allInstances){
-                               if(instance.isAlive()){
+                       for (Instance instance: allInstances ){
+                               if (instance.isAlive()){
                                        numberAvailableInstances++;
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index 70d4510..4e0349a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 
 
-public class SlotSharingGroupAssignment implements Serializable {
-
-       static final long serialVersionUID = 42L;
+public class SlotSharingGroupAssignment {
 
        private static final Logger LOG = Scheduler.LOG;
 
-       private transient final Object lock = new Object();
+       private final Object lock = new Object();
 
        /** All slots currently allocated to this sharing group */
        private final Set<SharedSlot> allSlots = new 
LinkedHashSet<SharedSlot>();

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7d8229c..d136d6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -101,9 +101,8 @@ public class ExecutionGraphTestUtils {
                return getInstance(taskManager, 1);
        }
 
-       public static Instance getInstance(final ActorRef taskManager, final 
int numberOfSlots) throws
-                       Exception {
-                               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+       public static Instance getInstance(final ActorRef taskManager, final 
int numberOfSlots) throws Exception {
+               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index d8a7db3..f5089c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -102,7 +102,7 @@ public class ExecutionVertexDeploymentTest {
 
                        final JobVertexID jid = new JobVertexID();
 
-                       final TestActorRef simpleTaskManager = 
TestActorRef.create(system,
+                       final TestActorRef<?> simpleTaskManager = 
TestActorRef.create(system,
                                        
Props.create(SimpleAcknowledgingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
@@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest {
                try {
                        final JobVertexID jid = new JobVertexID();
 
-                       final TestActorRef simpleTaskManager = 
TestActorRef.create(system,
+                       final TestActorRef<?> simpleTaskManager = 
TestActorRef.create(system,
                                        
Props.create(SimpleAcknowledgingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
@@ -202,7 +202,7 @@ public class ExecutionVertexDeploymentTest {
 
                        final JobVertexID jid = new JobVertexID();
 
-                       final TestActorRef simpleTaskManager = 
TestActorRef.create(system,
+                       final TestActorRef<?> simpleTaskManager = 
TestActorRef.create(system,
                                        
Props.create(SimpleFailingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
@@ -238,7 +238,7 @@ public class ExecutionVertexDeploymentTest {
                try {
                        final JobVertexID jid = new JobVertexID();
 
-                       final TestActorRef simpleTaskManager = 
TestActorRef.create(system,
+                       final TestActorRef<?> simpleTaskManager = 
TestActorRef.create(system,
                                        
Props.create(SimpleFailingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
@@ -287,7 +287,7 @@ public class ExecutionVertexDeploymentTest {
 
                        TestingUtils.setExecutionContext(ec);
 
-                       final TestActorRef simpleTaskManager = 
TestActorRef.create(system,
+                       final TestActorRef<?> simpleTaskManager = 
TestActorRef.create(system,
                                        
Props.create(SimpleAcknowledgingTaskManager.class));
                        final Instance instance = 
getInstance(simpleTaskManager);
                        final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
@@ -337,7 +337,7 @@ public class ExecutionVertexDeploymentTest {
                                        AkkaUtils.getDefaultTimeout());
                        final ExecutionAttemptID eid = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                       final TestActorRef simpleTaskManager = 
TestActorRef.create(system, Props.create(new
+                       final TestActorRef<?> simpleTaskManager = 
TestActorRef.create(system, Props.create(new
                                        
ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
                                        TaskOperationResult(eid, false), new 
TaskOperationResult(eid, true))));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
new file mode 100644
index 0000000..cae10f4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+
+public class VertexLocationConstraintTest {
+
+       private static final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
+       
+       private static ActorSystem system;
+       
+       private static TestActorRef<? extends Actor> taskManager;
+       
+       
+       @BeforeClass
+       public static void setup() {
+               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
+               
+               taskManager = TestActorRef.create(system,
+                               
Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+               system = null;
+       }
+       
+       
+       @Test
+       public void testScheduleWithConstraint1() {
+               try {
+                       final byte[] address1 = { 10, 0, 1, 4 };
+                       final byte[] address2 = { 10, 0, 1, 5 };
+                       final byte[] address3 = { 10, 0, 1, 6 };
+                       
+                       final String hostname1 = "host1";
+                       final String hostname2 = "host2";
+                       final String hostname3 = "host3";
+                       
+                       // prepare the scheduler
+                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
+                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
+                       Instance instance3 = getInstance(address3, 6789, 
hostname3);
+                       
+                       Scheduler scheduler = new Scheduler();
+                       scheduler.newInstanceAvailable(instance1);
+                       scheduler.newInstanceAvailable(instance2);
+                       scheduler.newInstanceAvailable(instance3);
+                       
+                       // prepare the execution graph
+                       AbstractJobVertex jobVertex = new 
AbstractJobVertex("test vertex", new JobVertexID());
+                       jobVertex.setInvokableClass(DummyInvokable.class);
+                       jobVertex.setParallelism(2);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       
+                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       eg.attachJobGraph(Collections.singletonList(jobVertex));
+                       
+                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
+                       ExecutionVertex[] vertices = ejv.getTaskVertices();
+                       
+                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+                       
+                       vertices[0].setScheduleLocalOnly(true);
+                       vertices[1].setScheduleLocalOnly(true);
+                       
+                       ejv.scheduleAll(scheduler, false);
+                       
+                       SimpleSlot slot1 = 
vertices[0].getCurrentAssignedResource();
+                       SimpleSlot slot2 = 
vertices[1].getCurrentAssignedResource();
+                       
+                       assertNotNull(slot1);
+                       assertNotNull(slot2);
+                       
+                       Instance target1 = slot1.getInstance();
+                       Instance target2 = slot2.getInstance();
+                       
+                       assertNotNull(target1);
+                       assertNotNull(target2);
+                       
+                       assertTrue(target1 == instance1 || target1 == 
instance2);
+                       assertTrue(target2 == instance3);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testScheduleWithConstraint2() {
+               
+               // same test as above, which swapped host names to guard 
against "accidentally worked" because of
+               // the order in which requests are handles by internal data 
structures
+               
+               try {
+                       final byte[] address1 = { 10, 0, 1, 4 };
+                       final byte[] address2 = { 10, 0, 1, 5 };
+                       final byte[] address3 = { 10, 0, 1, 6 };
+                       
+                       final String hostname1 = "host1";
+                       final String hostname2 = "host2";
+                       final String hostname3 = "host3";
+                       
+                       // prepare the scheduler
+                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
+                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
+                       Instance instance3 = getInstance(address3, 6789, 
hostname3);
+                       
+                       Scheduler scheduler = new Scheduler();
+                       scheduler.newInstanceAvailable(instance1);
+                       scheduler.newInstanceAvailable(instance2);
+                       scheduler.newInstanceAvailable(instance3);
+                       
+                       // prepare the execution graph
+                       AbstractJobVertex jobVertex = new 
AbstractJobVertex("test vertex", new JobVertexID());
+                       jobVertex.setInvokableClass(DummyInvokable.class);
+                       jobVertex.setParallelism(2);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       
+                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       eg.attachJobGraph(Collections.singletonList(jobVertex));
+                       
+                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
+                       ExecutionVertex[] vertices = ejv.getTaskVertices();
+                       
+                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3));
+                       
vertices[1].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+                       
+                       vertices[0].setScheduleLocalOnly(true);
+                       vertices[1].setScheduleLocalOnly(true);
+                       
+                       ejv.scheduleAll(scheduler, false);
+                       
+                       SimpleSlot slot1 = 
vertices[0].getCurrentAssignedResource();
+                       SimpleSlot slot2 = 
vertices[1].getCurrentAssignedResource();
+                       
+                       assertNotNull(slot1);
+                       assertNotNull(slot2);
+                       
+                       Instance target1 = slot1.getInstance();
+                       Instance target2 = slot2.getInstance();
+                       
+                       assertNotNull(target1);
+                       assertNotNull(target2);
+                       
+                       assertTrue(target1 == instance3);
+                       assertTrue(target2 == instance1 || target2 == 
instance2);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testScheduleWithConstraintAndSlotSharing() {
+               try {
+                       final byte[] address1 = { 10, 0, 1, 4 };
+                       final byte[] address2 = { 10, 0, 1, 5 };
+                       final byte[] address3 = { 10, 0, 1, 6 };
+                       
+                       final String hostname1 = "host1";
+                       final String hostname2 = "host2";
+                       final String hostname3 = "host3";
+                       
+                       // prepare the scheduler
+                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
+                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
+                       Instance instance3 = getInstance(address3, 6789, 
hostname3);
+                       
+                       Scheduler scheduler = new Scheduler();
+                       scheduler.newInstanceAvailable(instance1);
+                       scheduler.newInstanceAvailable(instance2);
+                       scheduler.newInstanceAvailable(instance3);
+                       
+                       // prepare the execution graph
+                       AbstractJobVertex jobVertex1 = new 
AbstractJobVertex("v1", new JobVertexID());
+                       AbstractJobVertex jobVertex2 = new 
AbstractJobVertex("v2", new JobVertexID());
+                       jobVertex1.setInvokableClass(DummyInvokable.class);
+                       jobVertex2.setInvokableClass(DummyInvokable.class);
+                       jobVertex1.setParallelism(2);
+                       jobVertex2.setParallelism(3);
+                       
+                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
+                       jobVertex1.setSlotSharingGroup(sharingGroup);
+                       jobVertex2.setSlotSharingGroup(sharingGroup);
+                       
+                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
+                       
+                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
+                       
+                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
+                       ExecutionVertex[] vertices = ejv.getTaskVertices();
+                       
+                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+                       
+                       vertices[0].setScheduleLocalOnly(true);
+                       vertices[1].setScheduleLocalOnly(true);
+                       
+                       ejv.scheduleAll(scheduler, false);
+                       
+                       SimpleSlot slot1 = 
vertices[0].getCurrentAssignedResource();
+                       SimpleSlot slot2 = 
vertices[1].getCurrentAssignedResource();
+                       
+                       assertNotNull(slot1);
+                       assertNotNull(slot2);
+                       
+                       Instance target1 = slot1.getInstance();
+                       Instance target2 = slot2.getInstance();
+                       
+                       assertNotNull(target1);
+                       assertNotNull(target2);
+                       
+                       assertTrue(target1 == instance1 || target1 == 
instance2);
+                       assertTrue(target2 == instance3);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testScheduleWithUnfulfillableConstraint() {
+               
+               // same test as above, which swapped host names to guard 
against "accidentally worked" because of
+               // the order in which requests are handles by internal data 
structures
+               
+               try {
+                       final byte[] address1 = { 10, 0, 1, 4 };
+                       final byte[] address2 = { 10, 0, 1, 5 };
+                       
+                       final String hostname1 = "host1";
+                       final String hostname2 = "host2";
+                       
+                       // prepare the scheduler
+                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
+                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
+                       
+                       Scheduler scheduler = new Scheduler();
+                       scheduler.newInstanceAvailable(instance1);
+                       
+                       // prepare the execution graph
+                       AbstractJobVertex jobVertex = new 
AbstractJobVertex("test vertex", new JobVertexID());
+                       jobVertex.setInvokableClass(DummyInvokable.class);
+                       jobVertex.setParallelism(1);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       
+                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       eg.attachJobGraph(Collections.singletonList(jobVertex));
+                       
+                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
+                       ExecutionVertex[] vertices = ejv.getTaskVertices();
+                       
+                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+                       vertices[0].setScheduleLocalOnly(true);
+                       
+                       try {
+                               ejv.scheduleAll(scheduler, false);
+                               fail("This should fail with a 
NoResourceAvailableException");
+                       }
+                       catch (NoResourceAvailableException e) {
+                               // bam! we are good...
+                               assertTrue(e.getMessage().contains(hostname2));
+                       }
+                       catch (Exception e) {
+                               fail("Wrong exception type");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testScheduleWithUnfulfillableConstraintInSharingGroup() {
+               
+               // same test as above, which swapped host names to guard 
against "accidentally worked" because of
+               // the order in which requests are handles by internal data 
structures
+               
+               try {
+                       final byte[] address1 = { 10, 0, 1, 4 };
+                       final byte[] address2 = { 10, 0, 1, 5 };
+                       
+                       final String hostname1 = "host1";
+                       final String hostname2 = "host2";
+                       
+                       // prepare the scheduler
+                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
+                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
+                       
+                       Scheduler scheduler = new Scheduler();
+                       scheduler.newInstanceAvailable(instance1);
+                       
+                       // prepare the execution graph
+                       AbstractJobVertex jobVertex1 = new 
AbstractJobVertex("v1", new JobVertexID());
+                       AbstractJobVertex jobVertex2 = new 
AbstractJobVertex("v2", new JobVertexID());
+                       
+                       jobVertex1.setInvokableClass(DummyInvokable.class);
+                       jobVertex2.setInvokableClass(DummyInvokable.class);
+                       
+                       jobVertex1.setParallelism(1);
+                       jobVertex2.setParallelism(1);
+                       
+                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
+                       
+                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
+                       jobVertex1.setSlotSharingGroup(sharingGroup);
+                       jobVertex2.setSlotSharingGroup(sharingGroup);
+                       
+                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
+                       
+                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
+                       ExecutionVertex[] vertices = ejv.getTaskVertices();
+                       
+                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+                       vertices[0].setScheduleLocalOnly(true);
+                       
+                       try {
+                               ejv.scheduleAll(scheduler, false);
+                               fail("This should fail with a 
NoResourceAvailableException");
+                       }
+                       catch (NoResourceAvailableException e) {
+                               // bam! we are good...
+                               assertTrue(e.getMessage().contains(hostname2));
+                       }
+                       catch (Exception e) {
+                               fail("Wrong exception type");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testArchivingClearsFields() {
+               try {
+                       AbstractJobVertex vertex = new AbstractJobVertex("test 
vertex", new JobVertexID());
+                       JobGraph jg = new JobGraph("test job", vertex);
+                       
+                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       eg.attachJobGraph(Collections.singletonList(vertex));
+                       
+                       ExecutionVertex ev = 
eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
+                       
+                       Instance instance = 
ExecutionGraphTestUtils.getInstance(ActorRef.noSender());
+                       
ev.setLocationConstraintHosts(Collections.singletonList(instance));
+                       
+                       assertNotNull(ev.getPreferredLocations());
+                       assertEquals(instance, 
ev.getPreferredLocations().iterator().next());
+                       
+                       // transition to a final state
+                       eg.fail(new Exception());
+                       
+                       eg.prepareForArchiving();
+                       
+                       assertTrue(ev.getPreferredLocations() == null || 
!ev.getPreferredLocations().iterator().hasNext());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static Instance getInstance(byte[] ipAddress, int dataPort, 
String hostname) throws Exception {
+               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+               
+               InstanceConnectionInfo connection = 
mock(InstanceConnectionInfo.class);
+               
when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
+               when(connection.dataPort()).thenReturn(dataPort);
+               
when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
+               when(connection.getHostname()).thenReturn(hostname);
+               when(connection.getFQDNHostname()).thenReturn(hostname);
+               
+               return new Instance(taskManager, connection, new InstanceID(), 
hardwareDescription, 1);
+       }
+}

Reply via email to