[FLINK-1478] [jobmanager] Scheduler support for external location constraints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970b2b7a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970b2b7a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970b2b7a Branch: refs/heads/master Commit: 970b2b7af85f62f525e697dfa1520e88c8fc5e51 Parents: a9ac7aa Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 5 18:28:12 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 5 18:50:40 2015 +0100 ---------------------------------------------------------------------- .../executiongraph/ExecutionJobVertex.java | 9 + .../runtime/executiongraph/ExecutionVertex.java | 57 ++- .../runtime/jobmanager/scheduler/Scheduler.java | 119 +++-- .../scheduler/SlotSharingGroupAssignment.java | 7 +- .../executiongraph/ExecutionGraphTestUtils.java | 5 +- .../ExecutionVertexDeploymentTest.java | 12 +- .../VertexLocationConstraintTest.java | 436 +++++++++++++++++++ 7 files changed, 586 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 6676e85..0439c08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -259,6 +259,15 @@ public class ExecutionJobVertex implements Serializable { //--------------------------------------------------------------------------------------------- public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { + +// ExecutionVertex[] vertices = this.taskVertices; +// +// for (int i = 0; i < vertices.length; i++) { +// ExecutionVertex v = vertices[i]; +// +// if (v.get +// } + for (ExecutionVertex ev : getTaskVertices()) { ev.scheduleForExecution(scheduler, queued); } http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 05cea73..86173da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -43,6 +43,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -82,6 +83,11 @@ public class ExecutionVertex implements Serializable { private volatile Execution currentExecution; // this field must never be null + + private volatile List<Instance> locationConstraintInstances; + + private volatile boolean scheduleLocalOnly; + // -------------------------------------------------------------------------------------------- public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, @@ -294,10 +300,22 @@ public class ExecutionVertex implements Serializable { } } - public void setTargetHostConstraint(String hostname) { + public void setLocationConstraintHosts(List<Instance> instances) { + this.locationConstraintInstances = instances; + } + + public void setScheduleLocalOnly(boolean scheduleLocalOnly) { + if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) { + throw new IllegalArgumentException("Strictly local scheduling is only supported for sources."); + } + this.scheduleLocalOnly = scheduleLocalOnly; } + public boolean isScheduleLocalOnly() { + return scheduleLocalOnly; + } + /** * Gets the location preferences of this task, determined by the locations of the predecessors from which * it receives input data. @@ -307,23 +325,37 @@ public class ExecutionVertex implements Serializable { * @return The preferred locations for this vertex execution, or null, if there is no preference. */ public Iterable<Instance> getPreferredLocations() { - HashSet<Instance> locations = new HashSet<Instance>(); + // if we have hard location constraints, use those + { + List<Instance> constraintInstances = this.locationConstraintInstances; + if (constraintInstances != null && !constraintInstances.isEmpty()) { + return constraintInstances; + } + } + + // otherwise, base the preferred locations on the input connections + if (inputEdges == null) { + return Collections.emptySet(); + } + else { + HashSet<Instance> locations = new HashSet<Instance>(); - for (int i = 0; i < inputEdges.length; i++) { - ExecutionEdge[] sources = inputEdges[i]; - if (sources != null) { - for (int k = 0; k < sources.length; k++) { - SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); - if (sourceSlot != null) { - locations.add(sourceSlot.getInstance()); - if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - return null; + for (int i = 0; i < inputEdges.length; i++) { + ExecutionEdge[] sources = inputEdges[i]; + if (sources != null) { + for (int k = 0; k < sources.length; k++) { + SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); + if (sourceSlot != null) { + locations.add(sourceSlot.getInstance()); + if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { + return null; + } } } } } + return locations; } - return locations; } // -------------------------------------------------------------------------------------------- @@ -407,6 +439,7 @@ public class ExecutionVertex implements Serializable { // clear the unnecessary fields in this class this.resultPartitions = null; this.inputEdges = null; + this.locationConstraintInstances = null; } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 1ad7a50..466e0e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -29,6 +29,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import akka.dispatch.Futures; + import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.runtime.akka.AkkaUtils; @@ -72,6 +73,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { private int nonLocalizedAssignments; + public Scheduler() { this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>(); } @@ -164,6 +166,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { } final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); + + final Iterable<Instance> preferredLocations = vertex.getPreferredLocations(); + final boolean forceExternalLocation = vertex.isScheduleLocalOnly() && + preferredLocations != null && preferredLocations.iterator().hasNext(); synchronized (globalLock) { @@ -179,6 +185,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment(); final CoLocationConstraint constraint = task.getLocationConstraint(); + // sanity check that we do not use an externally forced location and a co-location constraint together + if (constraint != null && forceExternalLocation) { + throw new IllegalArgumentException("The scheduling cannot be contrained simultaneously by a " + + "co-location constriaint and an external location constraint."); + } + // get a slot from the group, if the group has one for us (and can fulfill the constraint) SimpleSlot slotFromGroup; if (constraint == null) { @@ -206,7 +218,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation()); // get a new slot, since we could not place it into the group, or we could not place it locally - newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint); + newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation); SimpleSlot toUse; @@ -214,8 +226,20 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { if (slotFromGroup == null) { // both null if (constraint == null || constraint.isUnassigned()) { - throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); - } else { + if (forceExternalLocation) { + // could not satisfy the external location constraint + String hosts = getHostnamesFromInstances(preferredLocations); + throw new NoResourceAvailableException("Could not schedule task " + vertex + + " to any of the required hosts: " + hosts); + } + else { + // simply nothing is available + throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), + getTotalNumberOfSlots(), getNumberOfAvailableSlots()); + } + } + else { + // nothing is available on the node where the co-location constraint pushes us throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint."); } @@ -269,26 +293,49 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { } // 2) === schedule without hints and sharing === - - SimpleSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations()); - if (slot != null) { - updateLocalityCounters(slot.getLocality()); - return slot; - } - else { - // no resource available now, so queue the request - if (queueIfNoResource) { - SlotAllocationFuture future = new SlotAllocationFuture(); - this.taskQueue.add(new QueuedTask(task, future)); - return future; + { + SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); + if (slot != null) { + updateLocalityCounters(slot.getLocality()); + return slot; } else { - throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); + // no resource available now, so queue the request + if (queueIfNoResource) { + SlotAllocationFuture future = new SlotAllocationFuture(); + this.taskQueue.add(new QueuedTask(task, future)); + return future; + } + else if (forceExternalLocation) { + String hosts = getHostnamesFromInstances(preferredLocations); + throw new NoResourceAvailableException("Could not schedule task " + vertex + + " to any of the required hosts: " + hosts); + } + else { + throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); + } } } } } + + private String getHostnamesFromInstances(Iterable<Instance> instances) { + StringBuilder bld = new StringBuilder(); + + for (Instance i : instances) { + bld.append(i.getInstanceConnectionInfo().getHostname()); + bld.append(", "); + } + if (bld.length() == 0) { + return ""; + } + else { + bld.setLength(bld.length() - 2); + return bld.toString(); + } + } + /** * Gets a suitable instance to schedule the vertex execution to. * <p> @@ -297,21 +344,21 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { * @param vertex The task to run. * @return The instance to run the vertex on, it {@code null}, if no instance is available. */ - protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations) { + protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations, boolean localOnly) { // we need potentially to loop multiple times, because there may be false positives // in the set-with-available-instances while (true) { - Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations); + Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); - if(instanceLocalityPair == null){ + if (instanceLocalityPair == null){ return null; } Instance instanceToUse = instanceLocalityPair.getLeft(); Locality locality = instanceLocalityPair.getRight(); - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()){ if(locality == Locality.LOCAL){ LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); }else if(locality == Locality.NON_LOCAL){ @@ -348,25 +395,26 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations, SlotSharingGroupAssignment groupAssignment, - CoLocationConstraint constraint) { + CoLocationConstraint constraint, + boolean localOnly) { // we need potentially to loop multiple times, because there may be false positives // in the set-with-available-instances while (true) { - Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations); + Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); - if(instanceLocalityPair == null){ + if (instanceLocalityPair == null) { return null; } Instance instanceToUse = instanceLocalityPair.getLeft(); Locality locality = instanceLocalityPair.getRight(); - if(LOG.isDebugEnabled()){ - if(locality == Locality.LOCAL){ + if (LOG.isDebugEnabled()) { + if (locality == Locality.LOCAL) { LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); - }else if(locality == Locality.NON_LOCAL){ + } else if(locality == Locality.NON_LOCAL) { LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); - }else if(locality == Locality.UNCONSTRAINED) { + } else if(locality == Locality.UNCONSTRAINED) { LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); } } @@ -409,7 +457,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { * * @param requestedLocations */ - private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){ + private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations, boolean localOnly){ + if (this.instancesWithAvailableResources.isEmpty()) { // check if the asynchronous calls did not yet return the queues Instance queuedInstance = this.newlyAvailableInstances.poll(); @@ -434,14 +483,18 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { if (location != null && this.instancesWithAvailableResources.remove(location)) { instanceToUse = location; locality = Locality.LOCAL; - break; } } if (instanceToUse == null) { - instanceToUse = this.instancesWithAvailableResources.poll(); - locality = Locality.NON_LOCAL; + if (localOnly) { + return null; + } + else { + instanceToUse = this.instancesWithAvailableResources.poll(); + locality = Locality.NON_LOCAL; + } } } else { @@ -603,8 +656,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { public int getNumberOfAvailableInstances() { int numberAvailableInstances = 0; synchronized (this.globalLock) { - for(Instance instance: allInstances){ - if(instance.isAlive()){ + for (Instance instance: allInstances ){ + if (instance.isAlive()){ numberAvailableInstances++; } } http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java index 70d4510..4e0349a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmanager.scheduler; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -41,13 +40,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.slf4j.Logger; -public class SlotSharingGroupAssignment implements Serializable { - - static final long serialVersionUID = 42L; +public class SlotSharingGroupAssignment { private static final Logger LOG = Scheduler.LOG; - private transient final Object lock = new Object(); + private final Object lock = new Object(); /** All slots currently allocated to this sharing group */ private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>(); http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 7d8229c..d136d6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -101,9 +101,8 @@ public class ExecutionGraphTestUtils { return getInstance(taskManager, 1); } - public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws - Exception { - HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); + public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws Exception { + HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index d8a7db3..f5089c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -102,7 +102,7 @@ public class ExecutionVertexDeploymentTest { final JobVertexID jid = new JobVertexID(); - final TestActorRef simpleTaskManager = TestActorRef.create(system, + final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); @@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest { try { final JobVertexID jid = new JobVertexID(); - final TestActorRef simpleTaskManager = TestActorRef.create(system, + final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); @@ -202,7 +202,7 @@ public class ExecutionVertexDeploymentTest { final JobVertexID jid = new JobVertexID(); - final TestActorRef simpleTaskManager = TestActorRef.create(system, + final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(SimpleFailingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); @@ -238,7 +238,7 @@ public class ExecutionVertexDeploymentTest { try { final JobVertexID jid = new JobVertexID(); - final TestActorRef simpleTaskManager = TestActorRef.create(system, + final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(SimpleFailingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); @@ -287,7 +287,7 @@ public class ExecutionVertexDeploymentTest { TestingUtils.setExecutionContext(ec); - final TestActorRef simpleTaskManager = TestActorRef.create(system, + final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); @@ -337,7 +337,7 @@ public class ExecutionVertexDeploymentTest { AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId(); - final TestActorRef simpleTaskManager = TestActorRef.create(system, Props.create(new + final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(new ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new TaskOperationResult(eid, false), new TaskOperationResult(eid, true)))); http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java new file mode 100644 index 0000000..cae10f4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; + +public class VertexLocationConstraintTest { + + private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS); + + private static ActorSystem system; + + private static TestActorRef<? extends Actor> taskManager; + + + @BeforeClass + public static void setup() { + system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); + + taskManager = TestActorRef.create(system, + Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class)); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + + @Test + public void testScheduleWithConstraint1() { + try { + final byte[] address1 = { 10, 0, 1, 4 }; + final byte[] address2 = { 10, 0, 1, 5 }; + final byte[] address3 = { 10, 0, 1, 6 }; + + final String hostname1 = "host1"; + final String hostname2 = "host2"; + final String hostname3 = "host3"; + + // prepare the scheduler + Instance instance1 = getInstance(address1, 6789, hostname1); + Instance instance2 = getInstance(address2, 6789, hostname2); + Instance instance3 = getInstance(address3, 6789, hostname3); + + Scheduler scheduler = new Scheduler(); + scheduler.newInstanceAvailable(instance1); + scheduler.newInstanceAvailable(instance2); + scheduler.newInstanceAvailable(instance3); + + // prepare the execution graph + AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID()); + jobVertex.setInvokableClass(DummyInvokable.class); + jobVertex.setParallelism(2); + JobGraph jg = new JobGraph("test job", jobVertex); + + ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + eg.attachJobGraph(Collections.singletonList(jobVertex)); + + ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); + ExecutionVertex[] vertices = ejv.getTaskVertices(); + + vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2)); + vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3)); + + vertices[0].setScheduleLocalOnly(true); + vertices[1].setScheduleLocalOnly(true); + + ejv.scheduleAll(scheduler, false); + + SimpleSlot slot1 = vertices[0].getCurrentAssignedResource(); + SimpleSlot slot2 = vertices[1].getCurrentAssignedResource(); + + assertNotNull(slot1); + assertNotNull(slot2); + + Instance target1 = slot1.getInstance(); + Instance target2 = slot2.getInstance(); + + assertNotNull(target1); + assertNotNull(target2); + + assertTrue(target1 == instance1 || target1 == instance2); + assertTrue(target2 == instance3); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testScheduleWithConstraint2() { + + // same test as above, which swapped host names to guard against "accidentally worked" because of + // the order in which requests are handles by internal data structures + + try { + final byte[] address1 = { 10, 0, 1, 4 }; + final byte[] address2 = { 10, 0, 1, 5 }; + final byte[] address3 = { 10, 0, 1, 6 }; + + final String hostname1 = "host1"; + final String hostname2 = "host2"; + final String hostname3 = "host3"; + + // prepare the scheduler + Instance instance1 = getInstance(address1, 6789, hostname1); + Instance instance2 = getInstance(address2, 6789, hostname2); + Instance instance3 = getInstance(address3, 6789, hostname3); + + Scheduler scheduler = new Scheduler(); + scheduler.newInstanceAvailable(instance1); + scheduler.newInstanceAvailable(instance2); + scheduler.newInstanceAvailable(instance3); + + // prepare the execution graph + AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID()); + jobVertex.setInvokableClass(DummyInvokable.class); + jobVertex.setParallelism(2); + JobGraph jg = new JobGraph("test job", jobVertex); + + ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + eg.attachJobGraph(Collections.singletonList(jobVertex)); + + ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); + ExecutionVertex[] vertices = ejv.getTaskVertices(); + + vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3)); + vertices[1].setLocationConstraintHosts(Arrays.asList(instance1, instance2)); + + vertices[0].setScheduleLocalOnly(true); + vertices[1].setScheduleLocalOnly(true); + + ejv.scheduleAll(scheduler, false); + + SimpleSlot slot1 = vertices[0].getCurrentAssignedResource(); + SimpleSlot slot2 = vertices[1].getCurrentAssignedResource(); + + assertNotNull(slot1); + assertNotNull(slot2); + + Instance target1 = slot1.getInstance(); + Instance target2 = slot2.getInstance(); + + assertNotNull(target1); + assertNotNull(target2); + + assertTrue(target1 == instance3); + assertTrue(target2 == instance1 || target2 == instance2); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testScheduleWithConstraintAndSlotSharing() { + try { + final byte[] address1 = { 10, 0, 1, 4 }; + final byte[] address2 = { 10, 0, 1, 5 }; + final byte[] address3 = { 10, 0, 1, 6 }; + + final String hostname1 = "host1"; + final String hostname2 = "host2"; + final String hostname3 = "host3"; + + // prepare the scheduler + Instance instance1 = getInstance(address1, 6789, hostname1); + Instance instance2 = getInstance(address2, 6789, hostname2); + Instance instance3 = getInstance(address3, 6789, hostname3); + + Scheduler scheduler = new Scheduler(); + scheduler.newInstanceAvailable(instance1); + scheduler.newInstanceAvailable(instance2); + scheduler.newInstanceAvailable(instance3); + + // prepare the execution graph + AbstractJobVertex jobVertex1 = new AbstractJobVertex("v1", new JobVertexID()); + AbstractJobVertex jobVertex2 = new AbstractJobVertex("v2", new JobVertexID()); + jobVertex1.setInvokableClass(DummyInvokable.class); + jobVertex2.setInvokableClass(DummyInvokable.class); + jobVertex1.setParallelism(2); + jobVertex2.setParallelism(3); + + SlotSharingGroup sharingGroup = new SlotSharingGroup(); + jobVertex1.setSlotSharingGroup(sharingGroup); + jobVertex2.setSlotSharingGroup(sharingGroup); + + JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); + + ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); + + ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID()); + ExecutionVertex[] vertices = ejv.getTaskVertices(); + + vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2)); + vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3)); + + vertices[0].setScheduleLocalOnly(true); + vertices[1].setScheduleLocalOnly(true); + + ejv.scheduleAll(scheduler, false); + + SimpleSlot slot1 = vertices[0].getCurrentAssignedResource(); + SimpleSlot slot2 = vertices[1].getCurrentAssignedResource(); + + assertNotNull(slot1); + assertNotNull(slot2); + + Instance target1 = slot1.getInstance(); + Instance target2 = slot2.getInstance(); + + assertNotNull(target1); + assertNotNull(target2); + + assertTrue(target1 == instance1 || target1 == instance2); + assertTrue(target2 == instance3); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testScheduleWithUnfulfillableConstraint() { + + // same test as above, which swapped host names to guard against "accidentally worked" because of + // the order in which requests are handles by internal data structures + + try { + final byte[] address1 = { 10, 0, 1, 4 }; + final byte[] address2 = { 10, 0, 1, 5 }; + + final String hostname1 = "host1"; + final String hostname2 = "host2"; + + // prepare the scheduler + Instance instance1 = getInstance(address1, 6789, hostname1); + Instance instance2 = getInstance(address2, 6789, hostname2); + + Scheduler scheduler = new Scheduler(); + scheduler.newInstanceAvailable(instance1); + + // prepare the execution graph + AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID()); + jobVertex.setInvokableClass(DummyInvokable.class); + jobVertex.setParallelism(1); + JobGraph jg = new JobGraph("test job", jobVertex); + + ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + eg.attachJobGraph(Collections.singletonList(jobVertex)); + + ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); + ExecutionVertex[] vertices = ejv.getTaskVertices(); + + vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2)); + vertices[0].setScheduleLocalOnly(true); + + try { + ejv.scheduleAll(scheduler, false); + fail("This should fail with a NoResourceAvailableException"); + } + catch (NoResourceAvailableException e) { + // bam! we are good... + assertTrue(e.getMessage().contains(hostname2)); + } + catch (Exception e) { + fail("Wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testScheduleWithUnfulfillableConstraintInSharingGroup() { + + // same test as above, which swapped host names to guard against "accidentally worked" because of + // the order in which requests are handles by internal data structures + + try { + final byte[] address1 = { 10, 0, 1, 4 }; + final byte[] address2 = { 10, 0, 1, 5 }; + + final String hostname1 = "host1"; + final String hostname2 = "host2"; + + // prepare the scheduler + Instance instance1 = getInstance(address1, 6789, hostname1); + Instance instance2 = getInstance(address2, 6789, hostname2); + + Scheduler scheduler = new Scheduler(); + scheduler.newInstanceAvailable(instance1); + + // prepare the execution graph + AbstractJobVertex jobVertex1 = new AbstractJobVertex("v1", new JobVertexID()); + AbstractJobVertex jobVertex2 = new AbstractJobVertex("v2", new JobVertexID()); + + jobVertex1.setInvokableClass(DummyInvokable.class); + jobVertex2.setInvokableClass(DummyInvokable.class); + + jobVertex1.setParallelism(1); + jobVertex2.setParallelism(1); + + JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); + + SlotSharingGroup sharingGroup = new SlotSharingGroup(); + jobVertex1.setSlotSharingGroup(sharingGroup); + jobVertex2.setSlotSharingGroup(sharingGroup); + + ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); + + ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID()); + ExecutionVertex[] vertices = ejv.getTaskVertices(); + + vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2)); + vertices[0].setScheduleLocalOnly(true); + + try { + ejv.scheduleAll(scheduler, false); + fail("This should fail with a NoResourceAvailableException"); + } + catch (NoResourceAvailableException e) { + // bam! we are good... + assertTrue(e.getMessage().contains(hostname2)); + } + catch (Exception e) { + fail("Wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testArchivingClearsFields() { + try { + AbstractJobVertex vertex = new AbstractJobVertex("test vertex", new JobVertexID()); + JobGraph jg = new JobGraph("test job", vertex); + + ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + eg.attachJobGraph(Collections.singletonList(vertex)); + + ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0]; + + Instance instance = ExecutionGraphTestUtils.getInstance(ActorRef.noSender()); + ev.setLocationConstraintHosts(Collections.singletonList(instance)); + + assertNotNull(ev.getPreferredLocations()); + assertEquals(instance, ev.getPreferredLocations().iterator().next()); + + // transition to a final state + eg.fail(new Exception()); + + eg.prepareForArchiving(); + + assertTrue(ev.getPreferredLocations() == null || !ev.getPreferredLocations().iterator().hasNext()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + public static Instance getInstance(byte[] ipAddress, int dataPort, String hostname) throws Exception { + HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); + + InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class); + when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress)); + when(connection.dataPort()).thenReturn(dataPort); + when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString()); + when(connection.getHostname()).thenReturn(hostname); + when(connection.getFQDNHostname()).thenReturn(hostname); + + return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, 1); + } +}