Repository: aurora Updated Branches: refs/heads/master 952ef6db3 -> a80260eaf
Accept resource offers from multiple framework roles. Bugs closed: AURORA-1109 Reviewed at https://reviews.apache.org/r/42126/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a80260ea Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a80260ea Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a80260ea Branch: refs/heads/master Commit: a80260eafcd652e75706f9ad5a5a1886c7b051ef Parents: 952ef6d Author: Zhitao Li <[email protected]> Authored: Thu Jan 14 10:43:32 2016 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Jan 14 10:43:32 2016 -0800 ---------------------------------------------------------------------- NEWS | 2 + .../apache/aurora/scheduler/AcceptedOffer.java | 234 ++++++++++++++ .../apache/aurora/scheduler/ResourceSlot.java | 50 ++- .../org/apache/aurora/scheduler/Resources.java | 37 +-- .../mesos/CommandLineDriverSettingsModule.java | 19 +- .../scheduler/mesos/MesosTaskFactory.java | 58 ++-- .../aurora/scheduler/state/TaskAssigner.java | 2 +- .../aurora/scheduler/AcceptedOfferTest.java | 303 +++++++++++++++++++ .../aurora/scheduler/ResourceSlotTest.java | 14 +- .../CommandLineDriverSettingsModuleTest.java | 23 +- .../mesos/MesosTaskFactoryImplTest.java | 61 +++- .../scheduler/state/TaskAssignerImplTest.java | 8 +- 12 files changed, 714 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index acaff9e..809077f 100644 --- a/NEWS +++ b/NEWS @@ -21,6 +21,8 @@ at http://logback.qos.ch/manual/configuration.html With this change, we have removed the following scheduler command line arguments as they were made redundant: `logtostderr`, `alsologtostderr`, `vlog`, `vmodule`, and `use_glog_formatter`. +- Added support for configuring Mesos role by passing `-mesos_role` to Aurora scheduler at start time. + This enables resource reservation for Aurora when running in a shared Mesos cluster. 0.11.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java new file mode 100644 index 0000000..9c2dc0b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java @@ -0,0 +1,234 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.List; +import java.util.Set; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.scheduler.base.Numbers; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.Resource; + +import static java.util.Objects.requireNonNull; + +/** + * Allocate resources from an accepted Mesos Offer to TaskInfo and ExecutorInfo. + */ +public final class AcceptedOffer { + + public static final String DEFAULT_ROLE_NAME = "*"; + + /** + * Reserved resource filter. + */ + public static final Predicate<Resource> RESERVED = + e -> e.hasRole() && !e.getRole().equals(DEFAULT_ROLE_NAME); + + /** + * Non reserved resource filter. + */ + public static final Predicate<Resource> NOT_RESERVED = Predicates.not(RESERVED); + + /** + * Helper function to check a resource value is small enough to be considered zero. + */ + public static boolean nearZero(double value) { + return Math.abs(value) < EPSILON; + } + + /** + * Get proper value for {@link org.apache.mesos.Protos.TaskInfo}'s resources. + * @return A list of Resource used for TaskInfo. + */ + public List<Resource> getTaskResources() { + return taskResources; + } + + /** + * Get proper value for {@link org.apache.mesos.Protos.ExecutorInfo}'s resources. + * @return A list of Resource used for ExecutorInfo. + */ + public List<Resource> getExecutorResources() { + return executorResources; + } + + /** + * Use this epsilon value to avoid comparison with zero. + */ + private static final double EPSILON = 1e-6; + + private final List<Resource> taskResources; + private final List<Resource> executorResources; + + public static AcceptedOffer create( + Offer offer, + ResourceSlot taskSlot, + ResourceSlot executorSlot, + Set<Integer> selectedPorts, + TierInfo tierInfo) throws Resources.InsufficientResourcesException { + + List<Resource> reservedFirst = ImmutableList.<Resource>builder() + .addAll(Iterables.filter(offer.getResourcesList(), RESERVED)) + .addAll(Iterables.filter(offer.getResourcesList(), NOT_RESERVED)) + .build(); + + boolean revocable = tierInfo.isRevocable(); + List<Resource.Builder> cpuResources = filterToBuilders( + reservedFirst, + ResourceType.CPUS.getName(), + revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE); + List<Resource.Builder> memResources = filterToBuilderNonRevocable( + reservedFirst, ResourceType.RAM_MB.getName()); + List<Resource.Builder> diskResources = filterToBuilderNonRevocable( + reservedFirst, ResourceType.DISK_MB.getName()); + List<Resource.Builder> portsResources = filterToBuilderNonRevocable( + reservedFirst, ResourceType.PORTS.getName()); + + List<Resource> taskResources = ImmutableList.<Resource>builder() + .addAll(allocateScalarType(cpuResources, taskSlot.getNumCpus(), revocable)) + .addAll(allocateScalarType(memResources, taskSlot.getRam().as(Data.MB), false)) + .addAll(allocateScalarType(diskResources, taskSlot.getDisk().as(Data.MB), false)) + .addAll(allocateRangeType(portsResources, selectedPorts)) + .build(); + + List<Resource> executorResources = ImmutableList.<Resource>builder() + .addAll(allocateScalarType(cpuResources, executorSlot.getNumCpus(), revocable)) + .addAll(allocateScalarType(memResources, executorSlot.getRam().as(Data.MB), false)) + .addAll(allocateScalarType(diskResources, executorSlot.getDisk().as(Data.MB), false)) + .build(); + + return new AcceptedOffer(taskResources, executorResources); + } + + private AcceptedOffer( + List<Resource> taskResources, + List<Resource> executorResources) { + + this.taskResources = requireNonNull(taskResources); + this.executorResources = requireNonNull(executorResources); + } + + private static List<Resource> allocateRangeType( + List<Resource.Builder> from, + Set<Integer> valueSet) throws Resources.InsufficientResourcesException { + + Set<Integer> leftOver = Sets.newHashSet(valueSet); + ImmutableList.Builder<Resource> result = ImmutableList.<Resource>builder(); + for (Resource.Builder r : from) { + Set<Integer> fromResource = Sets.newHashSet(Iterables.concat( + Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS))); + Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource)); + if (available.isEmpty()) { + continue; + } + Resource newResource = makeMesosRangeResource(r.build(), available); + result.add(newResource); + leftOver.removeAll(available); + if (leftOver.isEmpty()) { + break; + } + } + if (!leftOver.isEmpty()) { + // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is + // consistent. + // Maybe we should consider implementing resource veto with this class to ensure that. + throw new Resources.InsufficientResourcesException( + "Insufficient resource for range type when allocating from offer"); + } + return result.build(); + } + + /** + * Creates a mesos resource of integer ranges from given prototype. + * + * @param prototype Resource prototype. + * @param values Values to translate into ranges. + * @return A new mesos ranges resource. + */ + static Resource makeMesosRangeResource( + Resource prototype, + Set<Integer> values) { + + return Protos.Resource.newBuilder(prototype) + .setRanges(Protos.Value.Ranges.newBuilder() + .addAllRange( + Iterables.transform(Numbers.toRanges(values), ResourceSlot.RANGE_TRANSFORM))) + .build(); + } + + private static List<Resource> allocateScalarType( + List<Resource.Builder> from, + double amount, + boolean revocable) throws Resources.InsufficientResourcesException { + + double remaining = amount; + ImmutableList.Builder<Resource> result = ImmutableList.builder(); + for (Resource.Builder r : from) { + if (nearZero(remaining)) { + break; + } + final double available = r.getScalar().getValue(); + if (nearZero(available)) { + // Skip resource slot that is already used up. + continue; + } + final double used = Math.min(remaining, available); + remaining -= used; + Resource.Builder newResource = + Resource.newBuilder(r.build()) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build()); + if (revocable) { + newResource.setRevocable(Resource.RevocableInfo.newBuilder()); + } + result.add(newResource.build()); + r.getScalarBuilder().setValue(available - used); + } + if (!nearZero(remaining)) { + // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is + // consistent. + // Maybe we should consider implementing resource veto with this class to ensure that. + throw new Resources.InsufficientResourcesException( + "Insufficient resource when allocating from offer"); + } + return result.build(); + } + + private static List<Resource.Builder> filterToBuilders( + List<Resource> resources, + String name, + Predicate<Resource> additionalFilter) { + + return FluentIterable.from(resources) + .filter(e -> e.getName().equals(name)) + .filter(additionalFilter) + .transform(Resource::toBuilder) + .toList(); + } + + private static List<Resource.Builder> filterToBuilderNonRevocable( + List<Resource> resources, + String name) { + + return filterToBuilders(resources, name, Resources.NON_REVOCABLE); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java index 7c3d681..86f2667 100644 --- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java +++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java @@ -23,7 +23,6 @@ import java.util.function.Predicate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.Range; @@ -43,7 +42,6 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.common.quantity.Data.BYTES; import static org.apache.aurora.scheduler.ResourceType.CPUS; import static org.apache.aurora.scheduler.ResourceType.DISK_MB; -import static org.apache.aurora.scheduler.ResourceType.PORTS; import static org.apache.aurora.scheduler.ResourceType.RAM_MB; /** @@ -62,6 +60,15 @@ public final class ResourceSlot { public static final ResourceSlot NONE = new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0); + /** + * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}. + */ + public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM = + input -> Protos.Value.Range.newBuilder() + .setBegin(input.lowerEndpoint()) + .setEnd(input.upperEndpoint()) + .build(); + public ResourceSlot( double numCpus, Amount<Long, Data> ram, @@ -90,26 +97,6 @@ public final class ResourceSlot { } /** - * Adapts this slot object to a list of Mesos resources. - * - * @param selectedPorts The ports selected, to be applied as concrete task ranges. - * @param tierInfo Task tier info. - * @return Mesos resources. - */ - public List<Protos.Resource> toResourceList(Set<Integer> selectedPorts, TierInfo tierInfo) { - ImmutableList.Builder<Protos.Resource> resourceBuilder = - ImmutableList.<Protos.Resource>builder() - .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable())) - .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false)) - .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false)); - if (!selectedPorts.isEmpty()) { - resourceBuilder.add(makeMesosRangeResource(PORTS, selectedPorts)); - } - - return resourceBuilder.build(); - } - - /** * Ensures that the revocable setting on the executor and task CPU resources match. * * @param task Task to check for resource type alignment. @@ -142,23 +129,26 @@ public final class ResourceSlot { /** * Convenience method for adapting to Mesos resources without applying a port range. * - * @see {@link #toResourceList(java.util.Set, TierInfo)} * @param tierInfo Task tier info. * @return Mesos resources. */ public List<Protos.Resource> toResourceList(TierInfo tierInfo) { - return toResourceList(ImmutableSet.of(), tierInfo); + return ImmutableList.<Protos.Resource>builder() + .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable())) + .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false)) + .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false)) + .build(); } /** * Creates a mesos resource of integer ranges. * * @param resourceType Resource type. - * @param values Values to translate into ranges. - * @return A mesos ranges resource. + * @param values Values to translate into ranges. + * @return A new mesos ranges resource. */ @VisibleForTesting - static Protos.Resource makeMesosRangeResource( + public static Protos.Resource makeMesosRangeResource( ResourceType resourceType, Set<Integer> values) { @@ -348,10 +338,4 @@ public final class ResourceSlot { }; private static final Predicate<Integer> IS_ZERO = e -> e == 0; - - private static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM = - input -> Protos.Value.Range.newBuilder() - .setBegin(input.lowerEndpoint()) - .setEnd(input.upperEndpoint()) - .build(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java index db422a9..4baf9dd 100644 --- a/src/main/java/org/apache/aurora/scheduler/Resources.java +++ b/src/main/java/org/apache/aurora/scheduler/Resources.java @@ -62,6 +62,14 @@ public final class Resources { */ public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); + /** + * Convert range to set of integers. + */ + public static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS = + range -> ContiguousSet.create( + com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()), + DiscreteDomain.integers()); + private final Iterable<Resource> mesosResources; private Resources(Iterable<Resource> mesosResources) { @@ -145,38 +153,33 @@ public final class Resources { } private double getScalarValue(String key) { - Resource resource = getResource(key); - if (resource == null) { - return 0; + Iterable<Resource> resources = getResources(key); + double value = 0; + for (Resource r : resources) { + value += r.getScalar().getValue(); } - - return resource.getScalar().getValue(); + return value; } - private Resource getResource(String key) { - return Iterables.find(mesosResources, e -> e.getName().equals(key), null); + private Iterable<Resource> getResources(String key) { + return Iterables.filter(mesosResources, e -> e.getName().equals(key)); } private Iterable<Range> getPortRanges() { - Resource resource = getResource(PORTS.getName()); - if (resource == null) { - return ImmutableList.of(); + ImmutableList.Builder<Range> ranges = ImmutableList.builder(); + for (Resource r : getResources(PORTS.getName())) { + ranges.addAll(r.getRanges().getRangeList().iterator()); } - return resource.getRanges().getRangeList(); + return ranges.build(); } /** * Thrown when there are insufficient resources to satisfy a request. */ - static class InsufficientResourcesException extends RuntimeException { + public static class InsufficientResourcesException extends RuntimeException { InsufficientResourcesException(String message) { super(message); } } - - private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS = - range -> ContiguousSet.create( - com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()), - DiscreteDomain.integers()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java index 2255dd4..7de8f4c 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java @@ -89,6 +89,12 @@ public class CommandLineDriverSettingsModule extends AbstractModule { help = "Allows receiving revocable resource offers from Mesos.") private static final Arg<Boolean> RECEIVE_REVOCABLE_RESOURCES = Arg.create(false); + @CmdLine(name = "mesos_role", + help = "The Mesos role this framework will register as. The default is to left this empty, " + + "and the framework will register without any role and only receive unreserved " + + "resources in offer.") + private static final Arg<String> MESOS_ROLE = Arg.create(); + // TODO(wfarner): Figure out a way to change this without risk of fallout (MESOS-703). private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler"; @@ -99,6 +105,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule { if (FRAMEWORK_ANNOUNCE_PRINCIPAL.get() && credentials.isPresent()) { principal = Optional.of(credentials.get().getPrincipal()); } + Optional<String> role = + MESOS_ROLE.hasAppliedValue() ? Optional.of(MESOS_ROLE.get()) : Optional.absent(); DriverSettings settings = new DriverSettings( MESOS_MASTER_ADDRESS.get(), credentials, @@ -106,7 +114,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule { EXECUTOR_USER.get(), principal, FRAMEWORK_FAILOVER_TIMEOUT.get(), - RECEIVE_REVOCABLE_RESOURCES.get())); + RECEIVE_REVOCABLE_RESOURCES.get(), + role)); bind(DriverSettings.class).toInstance(settings); } @@ -138,7 +147,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule { String executorUser, Optional<String> principal, Amount<Long, Time> failoverTimeout, - boolean revocable) { + boolean revocable, + Optional<String> role) { FrameworkInfo.Builder infoBuilder = FrameworkInfo.newBuilder() .setUser(executorUser) @@ -153,6 +163,11 @@ public class CommandLineDriverSettingsModule extends AbstractModule { if (revocable) { infoBuilder.addCapabilities(Capability.newBuilder().setType(REVOCABLE_RESOURCES)); } + + if (role.isPresent()) { + infoBuilder.setRole(role.get()); + } + return infoBuilder.build(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java index 8fdadda..fcad0e7 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java @@ -24,7 +24,9 @@ import com.google.protobuf.ByteString; import org.apache.aurora.Protobufs; import org.apache.aurora.codec.ThriftBinaryCodec; +import org.apache.aurora.scheduler.AcceptedOffer; import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.SchedulerException; @@ -38,6 +40,7 @@ import org.apache.mesos.Protos; import org.apache.mesos.Protos.ContainerInfo; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.SlaveID; import org.apache.mesos.Protos.TaskID; @@ -56,11 +59,11 @@ public interface MesosTaskFactory { * Creates a mesos task object. * * @param task Assigned task to translate into a task object. - * @param slaveId Id of the slave the task is being assigned to. + * @param offer Resource offer the task is being assigned to. * @return A new task. * @throws SchedulerException If the task could not be encoded. */ - TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException; + TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException; // TODO(wfarner): Move this class to its own file to reduce visibility to package private. class MesosTaskFactoryImpl implements MesosTaskFactory { @@ -95,9 +98,11 @@ public interface MesosTaskFactory { } @Override - public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException { + public TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException { requireNonNull(task); - requireNonNull(slaveId); + requireNonNull(offer); + + SlaveID slaveId = offer.getSlaveId(); byte[] taskInBytes; try { @@ -108,13 +113,21 @@ public interface MesosTaskFactory { } ITaskConfig config = task.getTask(); - + AcceptedOffer acceptedOffer; // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field. - List<Resource> resources = ResourceSlot.from(config).toResourceList( - task.isSetAssignedPorts() - ? ImmutableSet.copyOf(task.getAssignedPorts().values()) - : ImmutableSet.of(), - tierManager.getTier(task.getTask())); + try { + acceptedOffer = AcceptedOffer.create( + offer, + ResourceSlot.from(config), + executorSettings.getExecutorOverhead(), + task.isSetAssignedPorts() + ? ImmutableSet.copyOf(task.getAssignedPorts().values()) + : ImmutableSet.of(), + tierManager.getTier(task.getTask())); + } catch (Resources.InsufficientResourcesException e) { + throw new SchedulerException(e); + } + List<Resource> resources = acceptedOffer.getTaskResources(); LOG.debug( "Setting task resources to {}", @@ -128,9 +141,9 @@ public interface MesosTaskFactory { .setData(ByteString.copyFrom(taskInBytes)); if (config.getContainer().isSetMesos()) { - configureTaskForNoContainer(task, config, taskBuilder); + configureTaskForNoContainer(task, config, taskBuilder, acceptedOffer); } else if (config.getContainer().isSetDocker()) { - configureTaskForDockerContainer(task, config, taskBuilder); + configureTaskForDockerContainer(task, config, taskBuilder, acceptedOffer); } else { throw new SchedulerException("Task had no supported container set."); } @@ -141,15 +154,17 @@ public interface MesosTaskFactory { private void configureTaskForNoContainer( IAssignedTask task, ITaskConfig config, - TaskInfo.Builder taskBuilder) { + TaskInfo.Builder taskBuilder, + AcceptedOffer acceptedOffer) { - taskBuilder.setExecutor(configureTaskForExecutor(task, config).build()); + taskBuilder.setExecutor(configureTaskForExecutor(task, config, acceptedOffer).build()); } private void configureTaskForDockerContainer( IAssignedTask task, ITaskConfig taskConfig, - TaskInfo.Builder taskBuilder) { + TaskInfo.Builder taskBuilder, + AcceptedOffer acceptedOffer) { IDockerContainer config = taskConfig.getContainer().getDocker(); Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(), @@ -164,7 +179,7 @@ public interface MesosTaskFactory { configureContainerVolumes(containerBuilder); - ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, taskConfig) + ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, taskConfig, acceptedOffer) .setContainer(containerBuilder.build()); taskBuilder.setExecutor(execBuilder.build()); @@ -172,11 +187,18 @@ public interface MesosTaskFactory { private ExecutorInfo.Builder configureTaskForExecutor( IAssignedTask task, - ITaskConfig config) { + ITaskConfig config, + AcceptedOffer acceptedOffer) { - return executorSettings.getExecutorConfig().getExecutor().toBuilder() + ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder() .setExecutorId(getExecutorId(task.getTaskId())) .setSource(getInstanceSourceName(config, task.getInstanceId())); + List<Resource> executorResources = acceptedOffer.getExecutorResources(); + LOG.debug( + "Setting executor resources to {}", + Iterables.transform(executorResources, Protobufs::toString)); + builder.clearResources().addAllResources(executorResources); + return builder; } private void configureContainerVolumes(ContainerInfo.Builder containerBuilder) { http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 7e8e456..0c467a6 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -126,7 +126,7 @@ public interface TaskAssigner { LOG.info( "Offer on slave {} (id {}) is being assigned task for {}.", host, offer.getSlaveId().getValue(), taskId); - return taskFactory.createFrom(assigned, offer.getSlaveId()); + return taskFactory.createFrom(assigned, offer); } @Timed("assigner_maybe_assign") http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java new file mode 100644 index 0000000..39096af --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java @@ -0,0 +1,303 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Resource; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AcceptedOfferTest { + + private static final Optional<String> TEST_ROLE = Optional.of("test-role"); + private static final Optional<String> ABSENT_ROLE = Optional.absent(); + private static final ResourceSlot TASK_SLOT = new ResourceSlot( + 4, Amount.of(100L, Data.MB), Amount.of(200L, Data.MB), 0); + private static final ResourceSlot EXECUTOR_SLOT = new ResourceSlot( + 0.25, Amount.of(25L, Data.MB), Amount.of(75L, Data.MB), 0); + private static final ResourceSlot TOTAL_SLOT = EXECUTOR_SLOT.add(TASK_SLOT); + private static final Integer[] TASK_PORTS = {80, 90}; + private static final Set<Integer> TASK_PORTS_SET = ImmutableSet.copyOf(TASK_PORTS); + + @Test + public void testReservedPredicates() { + Protos.Resource withRole = makeScalar(ResourceType.CPUS.getName(), TEST_ROLE, false, 1.0); + assertTrue(AcceptedOffer.RESERVED.apply(withRole)); + assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole)); + Protos.Resource absentRole = makeScalar(ResourceType.CPUS.getName(), ABSENT_ROLE, false, 1.0); + assertFalse(AcceptedOffer.RESERVED.apply(absentRole)); + assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole)); + } + + @Test + public void testAllocateEmpty() { + AcceptedOffer acceptedOffer = AcceptedOffer.create( + fakeOffer(Collections.emptyList()), + ResourceSlot.NONE, + ResourceSlot.NONE, + ImmutableSet.of(), + TierInfo.DEFAULT); + assertEquals(Collections.emptyList(), acceptedOffer.getTaskResources()); + assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources()); + } + + @Test + public void testAllocateRange() { + List<Resource> resources = ImmutableList.<Resource>builder() + .add(makePortResource(Optional.absent(), 80, 81, 90, 91, 92, 93)) + .add(makePortResource(TEST_ROLE, 100, 101)) + .build(); + AcceptedOffer acceptedOffer = AcceptedOffer.create( + fakeOffer(resources), + ResourceSlot.NONE, + ResourceSlot.NONE, + ImmutableSet.of(80, 90, 100), + TierInfo.DEFAULT); + + List<Resource> expected = ImmutableList.<Resource>builder() + // Because we prefer reserved resources and handle them before non-reserved resources, + // result should have ports for the reserved resources first. + .add(makePortResource(TEST_ROLE, 100)) + .add(makePortResource(Optional.absent(), 80, 90)) + .build(); + assertEquals(expected, acceptedOffer.getTaskResources()); + assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources()); + } + + @Test(expected = Resources.InsufficientResourcesException.class) + public void testAllocateRangeInsufficent() { + List<Resource> resources = ImmutableList.of( + makePortResource(ABSENT_ROLE, 80), + makePortResource(ABSENT_ROLE, 100, 101)); + AcceptedOffer.create( + fakeOffer(resources), + ResourceSlot.NONE, + ResourceSlot.NONE, + ImmutableSet.of(80, 90, 100), + TierInfo.DEFAULT); + } + + @Test + public void testAllocateSingleRole() { + runAllocateSingleRole(ABSENT_ROLE, false); + runAllocateSingleRole(ABSENT_ROLE, true); + runAllocateSingleRole(TEST_ROLE, false); + runAllocateSingleRole(TEST_ROLE, true); + } + + private void runAllocateSingleRole(Optional<String> role, boolean cpuRevocable) { + List<Resource> resources = ImmutableList.<Resource>builder() + .add(makeScalar( + ResourceType.CPUS.getName(), role, cpuRevocable, TOTAL_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.RAM_MB.getName(), role, false, TOTAL_SLOT.getRam().as(Data.MB))) + .add(makeScalar( + ResourceType.DISK_MB.getName(), role, false, TOTAL_SLOT.getDisk().as(Data.MB))) + .add(makePortResource(role, TASK_PORTS)) + .build(); + Protos.Offer offer = fakeOffer(resources); + + AcceptedOffer offerAllocation = AcceptedOffer.create( + offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(cpuRevocable)); + + List<Resource> taskList = ImmutableList.<Resource>builder() + .add(makeScalar(ResourceType.CPUS.getName(), role, cpuRevocable, TASK_SLOT.getNumCpus())) + .add(makeScalar(ResourceType.RAM_MB.getName(), role, false, TASK_SLOT.getRam().as(Data.MB))) + .add(makeScalar( + ResourceType.DISK_MB.getName(), role, false, TASK_SLOT.getDisk().as(Data.MB))) + .add(makePortResource(role, TASK_PORTS)) + .build(); + assertEquals(taskList, offerAllocation.getTaskResources()); + + List<Resource> executorList = ImmutableList.<Resource>builder() + .add(makeScalar( + ResourceType.CPUS.getName(), role, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.RAM_MB.getName(), role, false, EXECUTOR_SLOT.getRam().as(Data.MB))) + .add(makeScalar( + ResourceType.DISK_MB.getName(), role, false, EXECUTOR_SLOT.getDisk().as(Data.MB))) + .build(); + assertEquals(executorList, offerAllocation.getExecutorResources()); + } + + @Test(expected = Resources.InsufficientResourcesException.class) + public void testAllocateSingleRoleInsufficient() { + List<Resource> resources = ImmutableList.<Resource>builder() + // EXECUTOR_SLOT's CPU is not included here. + .add(makeScalar(ResourceType.CPUS.getName(), TEST_ROLE, false, TASK_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.RAM_MB.getName(), TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB))) + .add(makeScalar( + ResourceType.DISK_MB.getName(), TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB))) + .add(makePortResource(TEST_ROLE, TASK_PORTS)) + .build(); + Protos.Offer offer = fakeOffer(resources); + + AcceptedOffer.create( + offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false)); + } + + @Test + public void testMultipleRoles() { + runMultipleRoles(false); + runMultipleRoles(true); + } + + private void runMultipleRoles(boolean cpuRevocable) { + List<Resource> resources = ImmutableList.<Resource>builder() + // Make cpus come from two roles. + .add(makeScalar( + ResourceType.CPUS.getName(), + TEST_ROLE, + cpuRevocable, + EXECUTOR_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.CPUS.getName(), + ABSENT_ROLE, + cpuRevocable, + TASK_SLOT.getNumCpus())) + // Make ram come from default role + .add(makeScalar( + ResourceType.RAM_MB.getName(), + ABSENT_ROLE, + false, + TOTAL_SLOT.getRam().as(Data.MB))) + // Make disk come from non-default role. + .add(makeScalar( + ResourceType.DISK_MB.getName(), + TEST_ROLE, + false, + TOTAL_SLOT.getDisk().as(Data.MB))) + .add(makePortResource(TEST_ROLE, 80)) + .add(makePortResource(ABSENT_ROLE, 90)) + .build(); + + Protos.Offer offer = fakeOffer(resources); + + AcceptedOffer offerAllocation = AcceptedOffer.create( + offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(cpuRevocable)); + + List<Resource> taskList = ImmutableList.<Resource>builder() + // We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus. + .add(makeScalar( + ResourceType.CPUS.getName(), TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.CPUS.getName(), + ABSENT_ROLE, + cpuRevocable, + TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus())) + .add(makeScalar( + ResourceType.RAM_MB.getName(), ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB))) + .add(makeScalar( + ResourceType.DISK_MB.getName(), TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB))) + .add(makePortResource(TEST_ROLE, 80)) + .add(makePortResource(ABSENT_ROLE, 90)) + .build(); + assertEquals(taskList, offerAllocation.getTaskResources()); + + List<Resource> executorList = ImmutableList.<Resource>builder() + .add(makeScalar( + ResourceType.CPUS.getName(), ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.RAM_MB.getName(), ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB))) + .add(makeScalar( + ResourceType.DISK_MB.getName(), TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB))) + .build(); + assertEquals(executorList, offerAllocation.getExecutorResources()); + } + + @Test(expected = Resources.InsufficientResourcesException.class) + public void testMultipleRolesInsufficient() { + // Similar to testMultipleRoles, but make some of cpus as revocable + List<Resource> resources = ImmutableList.<Resource>builder() + // Make cpus come from two roles. + .add(makeScalar( + ResourceType.CPUS.getName(), + TEST_ROLE, + true, + EXECUTOR_SLOT.getNumCpus())) + .add(makeScalar( + ResourceType.CPUS.getName(), + ABSENT_ROLE, + false, + TASK_SLOT.getNumCpus())) + // Make ram come from default role + .add(makeScalar( + ResourceType.RAM_MB.getName(), + ABSENT_ROLE, + false, + TOTAL_SLOT.getRam().as(Data.MB))) + // Make disk come from non-default role. + .add(makeScalar( + ResourceType.DISK_MB.getName(), + TEST_ROLE, + false, + TOTAL_SLOT.getDisk().as(Data.MB))) + .add(makePortResource(TEST_ROLE, 80)) + .add(makePortResource(ABSENT_ROLE, 90)) + .build(); + Protos.Offer offer = fakeOffer(resources); + // We don't have enough resource to satisfy a non-revocable request. + AcceptedOffer.create( + offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false)); + } + + private static Resource makePortResource(Optional<String> role, Integer... values) { + Resource.Builder prototype = Resource.newBuilder() + .setType(Protos.Value.Type.RANGES) + .setName(ResourceType.PORTS.getName()); + if (role.isPresent()) { + prototype.setRole(role.get()); + } + return AcceptedOffer.makeMesosRangeResource(prototype.build(), ImmutableSet.copyOf(values)); + } + + private static Resource makeScalar( + String name, Optional<String> role, boolean revocable, double value) { + Resource.Builder resource = Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)); + if (role.isPresent()) { + resource.setRole(role.get()); + } + if (revocable) { + resource.setRevocable(Resource.RevocableInfo.getDefaultInstance()); + } + return resource.build(); + } + + private static Protos.Offer fakeOffer(List<Resource> resources) { + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("offer-id")) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id")) + .setHostname("hostname") + .addAllResources(resources) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java index e4ae943..52113b8 100644 --- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java +++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java @@ -88,29 +88,25 @@ public class ResourceSlotTest { } @Test - public void testToResourceListNoRevocable() { + public void testToResourceListNoRevoca() { ResourceSlot resources = ResourceSlot.from(TASK); - Set<Integer> ports = ImmutableSet.of(80, 443); assertEquals( ImmutableSet.of( makeMesosResource(CPUS, TASK.getNumCpus(), false), makeMesosResource(RAM_MB, TASK.getRamMb(), false), - makeMesosResource(DISK_MB, TASK.getDiskMb(), false), - makeMesosRangeResource(PORTS, ports)), - ImmutableSet.copyOf(resources.toResourceList(ports, DEFAULT))); + makeMesosResource(DISK_MB, TASK.getDiskMb(), false)), + ImmutableSet.copyOf(resources.toResourceList(DEFAULT))); } @Test public void testToResourceListRevocable() { ResourceSlot resources = ResourceSlot.from(TASK); - Set<Integer> ports = ImmutableSet.of(80, 443); assertEquals( ImmutableSet.of( makeMesosResource(CPUS, TASK.getNumCpus(), true), makeMesosResource(RAM_MB, TASK.getRamMb(), false), - makeMesosResource(DISK_MB, TASK.getDiskMb(), false), - makeMesosRangeResource(PORTS, ports)), - ImmutableSet.copyOf(resources.toResourceList(ports, REVOCABLE_TIER))); + makeMesosResource(DISK_MB, TASK.getDiskMb(), false)), + ImmutableSet.copyOf(resources.toResourceList(REVOCABLE_TIER))); } @Test http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java index 33149ab..dc964b8 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java @@ -28,9 +28,13 @@ import org.junit.Test; import static org.apache.mesos.Protos.FrameworkInfo.Capability.Type.REVOCABLE_RESOURCES; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class CommandLineDriverSettingsModuleTest { + private static final String TEST_ROLE = "test-role"; + @Test(expected = IllegalStateException.class) public void testMissingPropertiesParsing() { Properties testProperties = new Properties(); @@ -72,9 +76,11 @@ public class CommandLineDriverSettingsModuleTest { "user", Optional.absent(), Amount.of(1L, Time.MINUTES), - false); + false, + Optional.absent()); assertEquals("", info.getPrincipal()); assertEquals(0, info.getCapabilitiesCount()); + assertFalse(info.hasRole()); } @Test @@ -83,10 +89,12 @@ public class CommandLineDriverSettingsModuleTest { "user", Optional.absent(), Amount.of(1L, Time.MINUTES), - true); + true, + Optional.absent()); assertEquals("", info.getPrincipal()); assertEquals(1, info.getCapabilitiesCount()); assertEquals(REVOCABLE_RESOURCES, info.getCapabilities(0).getType()); + assertFalse(info.hasRole()); } @Test @@ -95,20 +103,25 @@ public class CommandLineDriverSettingsModuleTest { "user", Optional.of("auroraprincipal"), Amount.of(1L, Time.MINUTES), - false); + false, + Optional.absent()); assertEquals("auroraprincipal", info.getPrincipal()); assertEquals(0, info.getCapabilitiesCount()); + assertFalse(info.hasRole()); } @Test - public void testFrameworkInfoRevocableWithAnnouncedPrincipal() { + public void testFrameworkInfoRevocableWithAnnouncedPrincipalAndRole() { Protos.FrameworkInfo info = CommandLineDriverSettingsModule.buildFrameworkInfo( "user", Optional.of("auroraprincipal"), Amount.of(1L, Time.MINUTES), - true); + true, + Optional.of(TEST_ROLE)); assertEquals("auroraprincipal", info.getPrincipal()); assertEquals(1, info.getCapabilitiesCount()); assertEquals(REVOCABLE_RESOURCES, info.getCapabilities(0).getType()); + assertTrue(info.hasRole()); + assertEquals(TEST_ROLE, info.getRole()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java index a5793bf..066c6a3 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.mesos; +import java.util.stream.Collectors; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -27,6 +29,7 @@ import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.MesosContainer; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.ResourceType; import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig; @@ -37,6 +40,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos; import org.apache.mesos.Protos.ContainerInfo.DockerInfo; import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.Parameter; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.SlaveID; @@ -46,9 +50,11 @@ import org.apache.mesos.Protos.Volume.Mode; import org.junit.Before; import org.junit.Test; +import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource; import static org.apache.aurora.scheduler.TierInfo.DEFAULT; import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER; import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR; +import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXECUTOR; import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG; import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR; import static org.easymock.EasyMock.expect; @@ -85,6 +91,23 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { ImmutableList.of(new DockerParameter("label", "testparameter"))))))); private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build(); + private static final Offer OFFER_THERMOS_EXECUTOR = Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("offer-id")) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) + .setSlaveId(SLAVE) + .setHostname("slave-hostname") + .addAllResources( + ResourceSlot.from(TASK_CONFIG).add(THERMOS_EXECUTOR.getExecutorOverhead()) + .toResourceList(DEFAULT)) + .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80))) + .build(); + private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder() + .clearResources() + .addAllResources( + ResourceSlot.from(TASK_CONFIG).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead()) + .toResourceList(DEFAULT)) + .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80))) + .build(); private MesosTaskFactory taskFactory; private ExecutorSettings config; @@ -106,6 +129,18 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { .build(); } + private static ExecutorInfo purgeZeroResources(ExecutorInfo executor) { + return executor.toBuilder() + .clearResources() + .addAllResources( + executor.getResourcesList() + .stream() + .filter( + e -> !e.hasScalar() || e.getScalar().getValue() > 0) + .collect(Collectors.toList())) + .build(); + } + @Test public void testExecutorInfoUnchanged() { expect(tierManager.getTier(TASK_CONFIG)).andReturn(DEFAULT); @@ -113,7 +148,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { control.replay(); - TaskInfo task = taskFactory.createFrom(TASK, SLAVE); + TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR); assertEquals(populateDynamicFields(DEFAULT_EXECUTOR, TASK), task.getExecutor()); checkTaskResources(TASK.getTask(), task); @@ -124,9 +159,17 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { expect(tierManager.getTier(TASK_CONFIG)).andReturn(REVOCABLE_TIER); taskFactory = new MesosTaskFactoryImpl(config, tierManager); + Resource revocableCPU = OFFER_THERMOS_EXECUTOR.getResources(0).toBuilder() + .setRevocable(Resource.RevocableInfo.getDefaultInstance()) + .build(); + Offer withRevocable = OFFER_THERMOS_EXECUTOR.toBuilder() + .removeResources(0) + .addResources(0, revocableCPU) + .build(); + control.replay(); - TaskInfo task = taskFactory.createFrom(TASK, SLAVE); + TaskInfo task = taskFactory.createFrom(TASK, withRevocable); checkTaskResources(TASK.getTask(), task); assertTrue(task.getResourcesList().stream().anyMatch(Resource::hasRevocable)); } @@ -142,7 +185,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { control.replay(); - TaskInfo task = taskFactory.createFrom(IAssignedTask.build(builder), SLAVE); + TaskInfo task = taskFactory.createFrom(IAssignedTask.build(builder), OFFER_THERMOS_EXECUTOR); checkTaskResources(ITaskConfig.build(builder.getTask()), task); } @@ -156,9 +199,10 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { control.replay(); - TaskInfo task = taskFactory.createFrom(TASK, SLAVE); + TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR); assertEquals( - populateDynamicFields(NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK), + purgeZeroResources(populateDynamicFields( + NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK)), task.getExecutor()); // Simulate the upsizing needed for the task to meet the minimum thermos requirements. @@ -177,13 +221,14 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { } private TaskInfo getDockerTaskInfo(IAssignedTask task) { - config = TaskExecutors.SOME_OVERHEAD_EXECUTOR; + config = SOME_OVERHEAD_EXECUTOR; + expect(tierManager.getTier(task.getTask())).andReturn(DEFAULT); taskFactory = new MesosTaskFactoryImpl(config, tierManager); control.replay(); - return taskFactory.createFrom(task, SLAVE); + return taskFactory.createFrom(task, OFFER_SOME_OVERHEAD_EXECUTOR); } @Test @@ -217,7 +262,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { control.replay(); - TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, SLAVE); + TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, OFFER_THERMOS_EXECUTOR); assertEquals( config.getExecutorConfig().getVolumeMounts(), taskInfo.getExecutor().getContainer().getVolumesList()); http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index 3cbe9ac..b00add0 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -135,7 +135,7 @@ public class TaskAssignerImplTest extends EasyMockTest { MESOS_OFFER.getSlaveId(), ImmutableMap.of(PORT_NAME, PORT))) .andReturn(TASK.getAssignedTask()); - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId())) + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) .andReturn(TASK_INFO); control.replay(); @@ -204,7 +204,7 @@ public class TaskAssignerImplTest extends EasyMockTest { LOST, LAUNCH_FAILED_MSG)) .andReturn(StateChangeResult.SUCCESS); - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId())) + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) .andReturn(TASK_INFO); control.replay(); @@ -261,7 +261,7 @@ public class TaskAssignerImplTest extends EasyMockTest { offer.getOffer().getSlaveId(), ImmutableMap.of(PORT_NAME, PORT))) .andReturn(TASK.getAssignedTask()); - expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer().getSlaveId())) + expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer())) .andReturn(TASK_INFO); offerManager.launchTask(offer.getOffer().getId(), TASK_INFO); @@ -313,7 +313,7 @@ public class TaskAssignerImplTest extends EasyMockTest { OFFER.getOffer().getSlaveId(), ImmutableMap.of(PORT_NAME, PORT))) .andReturn(TASK.getAssignedTask()); - expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer().getSlaveId())) + expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer())) .andReturn(TASK_INFO); offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
