http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java deleted file mode 100644 index f58c66a..0000000 --- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.Optional; -import java.util.Set; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.eventbus.Subscribe; - -import org.apache.aurora.gen.HostStatus; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.BatchWorker; -import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IHostStatus; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.mesos.v1.Protos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.MaintenanceMode.DRAINED; -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; - -/** - * Logic that puts hosts into maintenance mode, and triggers draining of hosts upon request. - * All state-changing functions return their results. Additionally, all state-changing functions - * will ignore requests to change state of unknown hosts and subsequently omit these hosts from - * return values. - */ -public interface MaintenanceController { - - /** - * Places hosts in maintenance mode. - * Hosts in maintenance mode are less-preferred for scheduling. - * No change will be made for hosts that are not recognized, and unrecognized hosts will not be - * included in the result. - * - * @param hosts Hosts to put into maintenance mode. - * @return The adjusted state of the hosts. - */ - Set<IHostStatus> startMaintenance(Set<String> hosts); - - /** - * Initiate a drain of all active tasks on {@code hosts}. - * - * @param hosts Hosts to drain. - * @return The adjusted state of the hosts. Hosts without any active tasks will be immediately - * moved to DRAINED. - */ - Set<IHostStatus> drain(Set<String> hosts); - - /** - * Drain tasks defined by the inverse offer. - * This method doesn't set any host attributes. - * - * @param inverseOffer the inverse offer to use. - */ - void drainForInverseOffer(Protos.InverseOffer inverseOffer); - - /** - * Fetches the current maintenance mode of {$code host}. - * - * @param host Host to fetch state for. - * @return Maintenance mode of host, {@link MaintenanceMode#NONE} if the host is not known. - */ - MaintenanceMode getMode(String host); - - /** - * Fetches the current state of {@code hosts}. - * - * @param hosts Hosts to fetch state for. - * @return The state of the hosts. - */ - Set<IHostStatus> getStatus(Set<String> hosts); - - /** - * Moves {@code hosts} out of maintenance mode, returning them to mode NONE. - * - * @param hosts Hosts to move out of maintenance mode. - * @return The adjusted state of the hosts. - */ - Set<IHostStatus> endMaintenance(Set<String> hosts); - - class MaintenanceControllerImpl implements MaintenanceController, EventSubscriber { - private static final Logger LOG = LoggerFactory.getLogger(MaintenanceControllerImpl.class); - private final Storage storage; - private final StateManager stateManager; - private final TaskEventBatchWorker batchWorker; - - @Inject - public MaintenanceControllerImpl( - Storage storage, - StateManager stateManager, - TaskEventBatchWorker batchWorker) { - - this.storage = requireNonNull(storage); - this.stateManager = requireNonNull(stateManager); - this.batchWorker = requireNonNull(batchWorker); - } - - private Set<String> drainTasksOnHost(String host, MutableStoreProvider store) { - Query.Builder query = Query.slaveScoped(host).active(); - Set<String> activeTasks = FluentIterable.from(store.getTaskStore().fetchTasks(query)) - .transform(Tasks::id) - .toSet(); - - if (activeTasks.isEmpty()) { - LOG.info("No tasks to drain on host: {}", host); - // Simple way to avoid the log message if there are no tasks. - return activeTasks; - } else { - LOG.info("Draining tasks: {} on host: {}", activeTasks, host); - for (String taskId : activeTasks) { - stateManager.changeState( - store, - taskId, - Optional.empty(), - ScheduleStatus.DRAINING, - DRAINING_MESSAGE); - } - - return activeTasks; - } - - } - - private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) { - LOG.info("Hosts to drain: " + hosts); - Set<String> emptyHosts = Sets.newHashSet(); - for (String host : hosts) { - Set<String> drainedTasks = drainTasksOnHost(host, store); - // If there are no tasks on the host, immediately transition to DRAINED. - if (drainedTasks.isEmpty()) { - emptyHosts.add(host); - } - } - - return ImmutableSet.<IHostStatus>builder() - .addAll(setMaintenanceMode(store, emptyHosts, DRAINED)) - .addAll(setMaintenanceMode(store, Sets.difference(hosts, emptyHosts), DRAINING)) - .build(); - } - - /** - * Notifies the MaintenanceController that a task has changed state. - * - * @param change Event - */ - @Subscribe - public void taskChangedState(final TaskStateChange change) { - if (Tasks.isTerminated(change.getNewState())) { - final String host = change.getTask().getAssignedTask().getSlaveHost(); - batchWorker.execute(store -> { - // If the task _was_ associated with a draining host, and it was the last task on the - // host. - Optional<IHostAttributes> attributes = - store.getAttributeStore().getHostAttributes(host); - if (attributes.isPresent() && attributes.get().getMode() == DRAINING) { - Query.Builder builder = Query.slaveScoped(host).active(); - Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder); - if (Iterables.isEmpty(activeTasks)) { - LOG.info("Moving host {} into DRAINED", host); - setMaintenanceMode(store, ImmutableSet.of(host), DRAINED); - } else { - LOG.info("Host {} is DRAINING with active tasks: {}", host, Tasks.ids(activeTasks)); - } - } - return BatchWorker.NO_RESULT; - }); - } - } - - @Override - public Set<IHostStatus> startMaintenance(Set<String> hosts) { - return storage.write( - storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED)); - } - - @VisibleForTesting - static final Optional<String> DRAINING_MESSAGE = - Optional.of("Draining machine for maintenance."); - - @Override - public Set<IHostStatus> drain(Set<String> hosts) { - return storage.write(store -> watchDrainingTasks(store, hosts)); - } - - private Optional<String> getHostname(Protos.InverseOffer offer) { - if (offer.getUrl().getAddress().hasHostname()) { - return Optional.of(offer.getUrl().getAddress().getHostname()); - } else { - return Optional.empty(); - } - } - - @Override - public void drainForInverseOffer(Protos.InverseOffer offer) { - // TaskStore does not allow for querying by agent id. - Optional<String> hostname = getHostname(offer); - - if (hostname.isPresent()) { - String host = hostname.get(); - storage.write(storeProvider -> drainTasksOnHost(host, storeProvider)); - } else { - LOG.error("Unable to drain tasks on agent {} because " - + "no hostname attached to inverse offer {}.", offer.getAgentId(), offer.getId()); - } - } - - private static final Function<IHostAttributes, String> HOST_NAME = - IHostAttributes::getHost; - - private static final Function<IHostAttributes, IHostStatus> ATTRS_TO_STATUS = - attributes -> IHostStatus.build( - new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode())); - - private static final Function<IHostStatus, MaintenanceMode> GET_MODE = IHostStatus::getMode; - - @Override - public MaintenanceMode getMode(final String host) { - return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host) - .map(ATTRS_TO_STATUS) - .map(GET_MODE) - .orElse(MaintenanceMode.NONE)); - } - - @Override - public Set<IHostStatus> getStatus(final Set<String> hosts) { - return storage.read(storeProvider -> { - // Warning - this is filtering _all_ host attributes. If using this to frequently query - // for a small set of hosts, a getHostAttributes variant should be added. - return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes()) - .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME)) - .transform(ATTRS_TO_STATUS) - .toSet(); - }); - } - - @Override - public Set<IHostStatus> endMaintenance(final Set<String> hosts) { - return storage.write( - storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE)); - } - - private Set<IHostStatus> setMaintenanceMode( - MutableStoreProvider storeProvider, - Set<String> hosts, - MaintenanceMode mode) { - - AttributeStore.Mutable store = storeProvider.getAttributeStore(); - ImmutableSet.Builder<IHostStatus> statuses = ImmutableSet.builder(); - for (String host : hosts) { - LOG.info("Setting maintenance mode to {} for host {}", mode, host); - Optional<IHostAttributes> toSave = AttributeStore.Util.mergeMode(store, host, mode); - if (toSave.isPresent()) { - store.saveHostAttributes(toSave.get()); - LOG.info("Updated host attributes: " + toSave.get()); - statuses.add(IHostStatus.build(new HostStatus().setHost(host).setMode(mode))); - } - } - return statuses.build(); - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/state/StateModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java index 0e0f90b..dfc2b5c 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java @@ -18,10 +18,8 @@ import javax.inject.Singleton; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; -import com.google.inject.Binder; import com.google.inject.Module; import org.apache.aurora.scheduler.app.MoreModules; @@ -31,7 +29,6 @@ import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl; import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule; -import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl; import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl; /** @@ -69,16 +66,7 @@ public class StateModule extends AbstractModule { PubsubEventModule.bindSubscriber(binder(), PartitionManager.class); - bindMaintenanceController(binder()); - bind(ClusterState.class).to(ClusterStateImpl.class); bind(ClusterStateImpl.class).in(Singleton.class); } - - @VisibleForTesting - static void bindMaintenanceController(Binder binder) { - binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class); - binder.bind(MaintenanceControllerImpl.class).in(Singleton.class); - PubsubEventModule.bindSubscriber(binder, MaintenanceControllerImpl.class); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java index e88cad6..800abfd 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java @@ -354,9 +354,13 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { public Response getJobUpdateDiff(JobUpdateRequest mutableRequest) { IJobUpdateRequest request; try { - request = IJobUpdateRequest.build(new JobUpdateRequest(mutableRequest).setTaskConfig( - configurationManager.validateAndPopulate( - ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder())); + request = IJobUpdateRequest.build( + new JobUpdateRequest(mutableRequest) + .setTaskConfig(configurationManager + .validateAndPopulate( + ITaskConfig.build(mutableRequest.getTaskConfig()), + mutableRequest.getInstanceCount()) + .newBuilder())); } catch (TaskDescriptionException e) { return error(INVALID_REQUEST, e); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 9fc0416..6a28bc2 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -63,6 +63,7 @@ import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.gen.Response; import org.apache.aurora.gen.Result; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.SlaPolicy; import org.apache.aurora.gen.StartJobUpdateResult; import org.apache.aurora.gen.StartMaintenanceResult; import org.apache.aurora.gen.TaskQuery; @@ -76,11 +77,11 @@ import org.apache.aurora.scheduler.configuration.SanitizedConfiguration; import org.apache.aurora.scheduler.cron.CronException; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.cron.SanitizedCronJob; +import org.apache.aurora.scheduler.maintenance.MaintenanceController; import org.apache.aurora.scheduler.quota.QuotaCheckResult; import org.apache.aurora.scheduler.quota.QuotaManager; import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException; import org.apache.aurora.scheduler.reconciliation.TaskReconciler; -import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.UUIDGenerator; @@ -156,6 +157,8 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { @VisibleForTesting static final String DRAIN_HOSTS = STAT_PREFIX + "drainHosts"; @VisibleForTesting + static final String SLA_DRAIN_HOSTS = STAT_PREFIX + "slaDrainHosts"; + @VisibleForTesting static final String MAINTENANCE_STATUS = STAT_PREFIX + "maintenanceStatus"; @VisibleForTesting static final String END_MAINTENANCE = STAT_PREFIX + "endMaintenance"; @@ -190,6 +193,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { private final AtomicLong restartShardsCounter; private final AtomicLong startMaintenanceCounter; private final AtomicLong drainHostsCounter; + private final AtomicLong slaDrainHostsCounter; private final AtomicLong maintenanceStatusCounter; private final AtomicLong endMaintenanceCounter; private final AtomicLong addInstancesCounter; @@ -237,6 +241,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { this.restartShardsCounter = statsProvider.makeCounter(RESTART_SHARDS); this.startMaintenanceCounter = statsProvider.makeCounter(START_MAINTENANCE); this.drainHostsCounter = statsProvider.makeCounter(DRAIN_HOSTS); + this.slaDrainHostsCounter = statsProvider.makeCounter(SLA_DRAIN_HOSTS); this.maintenanceStatusCounter = statsProvider.makeCounter(MAINTENANCE_STATUS); this.endMaintenanceCounter = statsProvider.makeCounter(END_MAINTENANCE); this.addInstancesCounter = statsProvider.makeCounter(ADD_INSTANCES); @@ -586,6 +591,14 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } @Override + public Response slaDrainHosts(Hosts hosts, SlaPolicy defaultSlaPolicy, long timeoutSecs) { + slaDrainHostsCounter.addAndGet(hosts.getHostNamesSize()); + return ok(Result.drainHostsResult( + new DrainHostsResult().setStatuses(IHostStatus.toBuildersSet( + maintenance.slaDrain(hosts.getHostNames(), defaultSlaPolicy, timeoutSecs))))); + } + + @Override public Response forceTaskState(String taskId, ScheduleStatus status) { checkNotBlank(taskId); requireNonNull(status); @@ -807,9 +820,12 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { IJobUpdateRequest request; try { - request = IJobUpdateRequest.build(new JobUpdateRequest(mutableRequest).setTaskConfig( - configurationManager.validateAndPopulate( - ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder())); + request = IJobUpdateRequest.build( + new JobUpdateRequest(mutableRequest) + .setTaskConfig(configurationManager.validateAndPopulate( + ITaskConfig.build(mutableRequest.getTaskConfig()), + mutableRequest.getInstanceCount()) + .newBuilder())); } catch (TaskDescriptionException e) { return error(INVALID_REQUEST, e); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/admin/admin_util.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/admin/admin_util.py b/src/main/python/apache/aurora/admin/admin_util.py index 8240e80..964876b 100644 --- a/src/main/python/apache/aurora/admin/admin_util.py +++ b/src/main/python/apache/aurora/admin/admin_util.py @@ -100,6 +100,29 @@ OVERRIDE_SLA_REASON_OPTION = optparse.Option( help='Reason for overriding default SLA values. Provide details including the ' 'maintenance ticket number.') +DEFAULT_SLA_PERCENTAGE_OPTION = optparse.Option( + '--default_percentage', + dest='default_percentage', + default=95, + help='Percentage of tasks required to be up all the time within the duration. ' + 'This percentage will be used for SLA calculation if tasks do not ' + 'have a configured SlaPolicy.') + +DEFAULT_SLA_DURATION_OPTION = optparse.Option( + '--default_duration', + dest='default_duration', + default='30m', + help='Time interval (now - value) for the percentage of up tasks. Format: XdYhZmWs. ' + 'This duration will be used for SLA calculation if tasks do not ' + 'have a configured SlaPolicy.') + +FORCE_DRAIN_TIMEOUT_OPTION = optparse.Option( + '--force_drain_timeout', + dest='timeout', + default='7d', + help='Time interval (now - value) after which tasks will be forcefully drained. ' + 'Format: XdYhZmWs.') + UNSAFE_SLA_HOSTS_FILE_OPTION = optparse.Option( '--unsafe_hosts_file', dest='unsafe_hosts_filename', @@ -213,6 +236,7 @@ def parse_and_validate_sla_overrides(options, hostnames): if has_override != all_overrides: die('All --override_* options are required when attempting to override default SLA values.') + print(options.percentage) percentage = parse_sla_percentage(options.percentage) if options.percentage else None duration = parse_time(options.duration) if options.duration else None if options.reason: @@ -230,6 +254,20 @@ def parse_and_validate_sla_overrides(options, hostnames): return percentage or SLA_UPTIME_PERCENTAGE_LIMIT, duration or SLA_UPTIME_DURATION_LIMIT +def parse_and_validate_sla_drain_default(options): + """Parses and validates host SLA default 3-tuple (percentage, duration, timeout). + + :param options: command line options + :type options: list of app.option + :rtype: a tuple of: default percentage (float), default duration (Amount) and timeout (Amount) + """ + percentage = parse_sla_percentage(options.default_percentage) + duration = parse_time(options.default_duration).as_(Time.SECONDS) + timeout = parse_time(options.timeout).as_(Time.SECONDS) + + return percentage, duration, timeout + + def print_results(results): """Prints formatted SLA results. http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/admin/host_maintenance.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/admin/host_maintenance.py b/src/main/python/apache/aurora/admin/host_maintenance.py index 83fc2b6..3ccf322 100644 --- a/src/main/python/apache/aurora/admin/host_maintenance.py +++ b/src/main/python/apache/aurora/admin/host_maintenance.py @@ -170,6 +170,7 @@ class HostMaintenance(object): for hostname in drained_hosts: callback(hostname) + # Deprecated: See AURORA-1986. def perform_maintenance(self, hostnames, grouping_function=DEFAULT_GROUPING, percentage=None, duration=None, output_file=None, callback=None): """Put hosts into maintenance mode and drain them. @@ -234,6 +235,25 @@ class HostMaintenance(object): return set(hostnames) - not_drained_hostnames + def perform_sla_maintenance(self, hostnames, percentage, duration, timeout): + """Submits hosts for SLA-aware to the scheduler. + + + :param hostnames: A list of hostnames to operate upon + :type hostnames: list of strings + :param percentage: SLA percentage to use + :type percentage: float + :param duration: SLA duration to use + :type duration: twitter.common.quantity.Time + :param timeout: Force drain timeout + :type duration: twitter.common.quantity.Time + :rtype: set of host names that were successfully drained + """ + drainable_hosts = Hosts(set(hostnames)) + check_and_log_response( + self._client.sla_drain_hosts(drainable_hosts, percentage, duration, timeout)) + return drainable_hosts + def check_status(self, hostnames): """Query the scheduler to determine the maintenance status for a list of hostnames http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/admin/maintenance.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/admin/maintenance.py b/src/main/python/apache/aurora/admin/maintenance.py index 942a237..33a7eea 100644 --- a/src/main/python/apache/aurora/admin/maintenance.py +++ b/src/main/python/apache/aurora/admin/maintenance.py @@ -18,13 +18,17 @@ from apache.aurora.client.base import GROUPING_OPTION, get_grouping_or_die, requ from apache.aurora.common.clusters import CLUSTERS from .admin_util import ( + DEFAULT_SLA_DURATION_OPTION, + DEFAULT_SLA_PERCENTAGE_OPTION, FILENAME_OPTION, + FORCE_DRAIN_TIMEOUT_OPTION, HOSTS_OPTION, OVERRIDE_SLA_DURATION_OPTION, OVERRIDE_SLA_PERCENTAGE_OPTION, OVERRIDE_SLA_REASON_OPTION, POST_DRAIN_SCRIPT_OPTION, UNSAFE_SLA_HOSTS_FILE_OPTION, + parse_and_validate_sla_drain_default, parse_and_validate_sla_overrides, parse_hostnames, parse_script @@ -132,6 +136,47 @@ def host_drain(cluster): @app.command [email protected]_option(FORCE_DRAIN_TIMEOUT_OPTION) [email protected]_option(FILENAME_OPTION) [email protected]_option(HOSTS_OPTION) [email protected]_option(DEFAULT_SLA_PERCENTAGE_OPTION) [email protected]_option(DEFAULT_SLA_DURATION_OPTION) [email protected]('cluster') +def sla_host_drain(cluster): + """usage: sla_host_drain {--filename=filename | --hosts=hosts} + [--default_percentage=percentage] + [--default_duration=duration] + [--force_drain_timeout=timeout] + cluster + + Asks the scheduler to drain the list of provided hosts in an SLA-aware manner. + + The list of hosts is drained and marked in a drained state. This will kill + off any tasks currently running on these hosts, as well as prevent future + tasks from scheduling on these hosts while they are drained. + + The hosts are left in maintenance mode upon completion. Use host_activate to + return hosts back to service and allow scheduling tasks on them. + + If tasks are unable to be drained after the specified timeout interval they will + be forcefully drained even if it breaks SLA. + """ + options = app.get_options() + drainable_hosts = parse_hostnames(options.filename, options.hosts) + + percentage, duration, timeout = parse_and_validate_sla_drain_default(options) + + HostMaintenance( + cluster=CLUSTERS[cluster], + verbosity=options.verbosity, + bypass_leader_redirect=options.bypass_leader_redirect).perform_sla_maintenance( + drainable_hosts, + percentage=percentage, + duration=duration, + timeout=timeout) + + [email protected] @app.command_option(FILENAME_OPTION) @app.command_option(HOSTS_OPTION) @requires.exactly('cluster') http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/client/api/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py index f6fd1dd..f1a82cb 100644 --- a/src/main/python/apache/aurora/client/api/__init__.py +++ b/src/main/python/apache/aurora/client/api/__init__.py @@ -33,8 +33,10 @@ from gen.apache.aurora.api.ttypes import ( JobUpdateQuery, JobUpdateRequest, Metadata, + PercentageSlaPolicy, Resource, ResourceAggregate, + SlaPolicy, TaskQuery ) @@ -298,6 +300,16 @@ class AuroraClientAPI(object): log.info("Draining tasks on: %s" % hosts.hostNames) return self._scheduler_proxy.drainHosts(hosts) + def sla_drain_hosts(self, hosts, percentage=None, duration=None, timeout=None): + log.info("Asking scheduler to drain tasks by SLA on: %s" % hosts.hostNames) + return self._scheduler_proxy.slaDrainHosts( + hosts, + SlaPolicy( + percentageSlaPolicy=PercentageSlaPolicy( + percentage=percentage, + durationSecs=duration)), + timeout) + def maintenance_status(self, hosts): log.info("Maintenance status for: %s" % hosts.hostNames) # read-only calls are retriable. http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/client/cli/context.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/context.py b/src/main/python/apache/aurora/client/cli/context.py index 06b1941..ad16c2d 100644 --- a/src/main/python/apache/aurora/client/cli/context.py +++ b/src/main/python/apache/aurora/client/cli/context.py @@ -244,3 +244,19 @@ class AuroraCommandContext(Context): raise self.CommandError(EXIT_INVALID_PARAMETER, "Invalid instance parameter: %s" % (list(unrecognized))) return active + + def has_count_or_percentage_sla_policy(self, key): + """Returns true if any of the tasks has count or percentage sla policy, false otherwise. + + :param key: Job key + :type key: AuroraJobKey + :return: true if any of the tasks has count or percentage sla policy, false otherwise. + """ + for task in self.get_active_tasks(key): + if task.assignedTask.task.slaPolicy: + if task.assignedTask.task.slaPolicy.percentageSlaPolicy: + return True + if task.assignedTask.task.slaPolicy.countSlaPolicy: + return True + + return False http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/python/apache/aurora/client/cli/jobs.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py index 536d04a..f5b6114 100644 --- a/src/main/python/apache/aurora/client/cli/jobs.py +++ b/src/main/python/apache/aurora/client/cli/jobs.py @@ -379,6 +379,9 @@ class AddCommand(Verb): active = context.get_active_instances_or_raise(job, [instance]) start = max(list(active)) + 1 + if context.has_count_or_percentage_sla_policy(job): + context.print_out("WARNING: Adding instances without updating SlaPolicy.") + api = context.get_api(job.cluster) resp = api.add_instances(job, instance, count) context.log_response_and_raise(resp) @@ -410,6 +413,10 @@ class KillCommand(AbstractKillCommand): raise context.CommandError(EXIT_INVALID_PARAMETER, "The instances list cannot be omitted in a kill command!; " "use killall to kill all instances") + + if context.has_count_or_percentage_sla_policy(job): + context.print_out("WARNING: Killing instances without updating SlaPolicy.") + if context.options.strict: context.get_active_instances_or_raise(job, instances_arg) api = context.get_api(job.cluster) http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 63c338e..534a912 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -39,12 +39,14 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.TypeLiteral; import org.apache.aurora.GuavaUtils; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.zookeeper.Credentials; import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; @@ -64,6 +66,7 @@ import org.apache.aurora.scheduler.AppStartup; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.config.CliOptions; +import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; import org.apache.aurora.scheduler.discovery.ZooKeeperConfig; @@ -71,6 +74,7 @@ import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.log.Log.Entry; import org.apache.aurora.scheduler.log.Log.Position; import org.apache.aurora.scheduler.log.Log.Stream; +import org.apache.aurora.scheduler.maintenance.MaintenanceController; import org.apache.aurora.scheduler.mesos.DriverFactory; import org.apache.aurora.scheduler.mesos.DriverSettings; import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory; @@ -207,6 +211,11 @@ public class SchedulerIT extends BaseZooKeeperTest { new ServerInfo() .setClusterName(CLUSTER_NAME) .setStatsUrlPrefix(STATS_URL_PREFIX))); + + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith( + MaintenanceController.MaintenanceControllerImpl.PollingInterval.class) + .toInstance(new TimeAmount(1, Time.MINUTES)); } }; ZooKeeperConfig zkClientConfig = http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java index 8c1f5ce..abee095 100644 --- a/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/base/TaskTestUtil.java @@ -91,6 +91,8 @@ public final class TaskTestUtil { true, true, true, + 2, + 1800L, ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS); public static final ExecutorID EXECUTOR_ID = ExecutorID.newBuilder() .setValue("PLACEHOLDER") @@ -126,7 +128,7 @@ public final class TaskTestUtil { .setPriority(1) .setMaxTaskFailures(-1) .setProduction(prod) - .setTier(PROD_TIER_NAME) + .setTier(prod ? PROD_TIER_NAME : DEV_TIER_NAME) .setPartitionPolicy(new PartitionPolicy().setDelaySecs(5).setReschedule(true)) .setSlaPolicy(SlaPolicy.percentageSlaPolicy( new PercentageSlaPolicy().setPercentage(95.0).setDurationSecs(1800))) http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index e66ec11..dcf5889 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -231,9 +231,13 @@ public class CommandLineTest { expected.mesosLog.coordinatorElectionRetries = 42; expected.mesosLog.readTimeout = TEST_TIME; expected.mesosLog.writeTimeout = TEST_TIME; - expected.sla.slaRefreshInterval = TEST_TIME; + expected.sla.minRequiredInstances = 42; + expected.sla.maxParallelCoordinators = 42; + expected.sla.maxSlaDuration = TEST_TIME; + expected.sla.slaCoordinatorTimeout = TEST_TIME; expected.sla.slaProdMetrics = ImmutableList.of(MetricCategory.JOB_UPTIMES); expected.sla.slaNonProdMetrics = ImmutableList.of(MetricCategory.JOB_UPTIMES); + expected.sla.slaRefreshInterval = TEST_TIME; expected.webhook.webhookConfigFile = tempFile; expected.scheduler.maxRegistrationDelay = TEST_TIME; expected.scheduler.maxLeadingDuration = TEST_TIME; @@ -251,6 +255,7 @@ public class CommandLineTest { expected.cron.cronMaxBatchSize = 42; expected.resourceSettings.enableRevocableCpus = false; expected.resourceSettings.enableRevocableRam = true; + expected.maintenance.hostMaintenancePollingInterval = TEST_TIME; assertAllNonDefaultParameters(expected); @@ -385,7 +390,12 @@ public class CommandLineTest { "-cron_scheduling_max_batch_size=42", "-enable_revocable_cpus=false", "-enable_revocable_ram=true", - "-partition_aware=true" + "-partition_aware=true", + "-sla_coordinator_timeout=42days", + "-host_maintenance_polling_interval=42days", + "-max_parallel_coordinated_maintenance=42", + "-min_required_instances_for_sla_check=42", + "-max_sla_duration_secs=42days" ); assertEqualOptions(expected, parsed); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java index 749ffea..a99fa7a 100644 --- a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java @@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableSet; import org.apache.aurora.gen.AppcImage; import org.apache.aurora.gen.Constraint; import org.apache.aurora.gen.Container; +import org.apache.aurora.gen.CoordinatorSlaPolicy; +import org.apache.aurora.gen.CountSlaPolicy; import org.apache.aurora.gen.CronCollisionPolicy; import org.apache.aurora.gen.DockerParameter; import org.apache.aurora.gen.ExecutorConfig; @@ -33,6 +35,8 @@ import org.apache.aurora.gen.LimitConstraint; import org.apache.aurora.gen.MesosContainer; import org.apache.aurora.gen.MesosFetcherURI; import org.apache.aurora.gen.Mode; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.SlaPolicy; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.gen.ValueConstraint; @@ -74,6 +78,9 @@ public class ConfigurationManagerTest { private static final ImmutableSet<Container._Fields> ALL_CONTAINER_TYPES = ImmutableSet.copyOf(Container._Fields.values()); + private static final int MIN_REQUIRED_INSTANCES = 20; + private static final long MAX_SLA_DURATION_SECS = 7200; + private static final long DEFAULT_SLA_PERCENTAGE = 95; private static final JobKey JOB_KEY = new JobKey("owner-role", "devel", "email_stats"); private static final JobConfiguration UNSANITIZED_JOB_CONFIGURATION = new JobConfiguration() @@ -126,6 +133,8 @@ public class ConfigurationManagerTest { false, true, false, + MIN_REQUIRED_INSTANCES, + MAX_SLA_DURATION_SECS, ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS), TaskTestUtil.TIER_MANAGER, TaskTestUtil.THRIFT_BACKFILL, @@ -139,6 +148,8 @@ public class ConfigurationManagerTest { true, true, true, + MIN_REQUIRED_INSTANCES, + MAX_SLA_DURATION_SECS, ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS), TaskTestUtil.TIER_MANAGER, TaskTestUtil.THRIFT_BACKFILL, @@ -160,7 +171,7 @@ public class ConfigurationManagerTest { taskConfig.getContainer().getDocker().setImage(null); expectTaskDescriptionException("A container must specify an image"); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig), 100); } @Test @@ -169,7 +180,7 @@ public class ConfigurationManagerTest { taskConfig.getContainer().getDocker().addToParameters(new DockerParameter("foo", "bar")); expectTaskDescriptionException(ConfigurationManager.NO_DOCKER_PARAMETERS); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(taskConfig), 100); } @Test @@ -179,7 +190,7 @@ public class ConfigurationManagerTest { builder.unsetExecutorConfig(); expectTaskDescriptionException(ConfigurationManager.EXECUTOR_REQUIRED_WITH_DOCKER); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 1); } @Test @@ -187,7 +198,7 @@ public class ConfigurationManagerTest { TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder(); builder.unsetExecutorConfig(); - DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -196,7 +207,7 @@ public class ConfigurationManagerTest { .setTier("pr/d")); expectTaskDescriptionException("Tier contains illegal characters"); - CONFIGURATION_MANAGER.validateAndPopulate(config); + CONFIGURATION_MANAGER.validateAndPopulate(config, 100); } @Test @@ -205,7 +216,7 @@ public class ConfigurationManagerTest { builder.getContainer().getDocker().setParameters(ImmutableList.of()); ITaskConfig result = - DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); // The resulting task config should contain parameters supplied to the ConfigurationManager. List<IDockerParameter> params = result.getContainer().getDocker().getParameters(); @@ -221,7 +232,7 @@ public class ConfigurationManagerTest { taskConfig.getContainer().getDocker().addToParameters(userParameter); ITaskConfig result = DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( - ITaskConfig.build(taskConfig)); + ITaskConfig.build(taskConfig), 100); // The resulting task config should contain parameters supplied from user config. List<IDockerParameter> params = result.getContainer().getDocker().getParameters(); @@ -236,7 +247,7 @@ public class ConfigurationManagerTest { .setConstraint(TaskConstraint.value( new ValueConstraint(false, ImmutableSet.of(JOB_KEY.getRole() + "/f")))))); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -246,7 +257,7 @@ public class ConfigurationManagerTest { .setName(DEDICATED_ATTRIBUTE) .setConstraint(TaskConstraint.value(new ValueConstraint(false, ImmutableSet.of("*/f")))))); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -257,7 +268,7 @@ public class ConfigurationManagerTest { .setConstraint(TaskConstraint.value(new ValueConstraint(false, ImmutableSet.of("r/f")))))); expectTaskDescriptionException("Only r may use hosts dedicated for that role."); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -267,7 +278,7 @@ public class ConfigurationManagerTest { builder.addToResources(ramMb(72)); expectTaskDescriptionException("Multiple resource values are not supported for CPU, RAM"); - DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -275,7 +286,7 @@ public class ConfigurationManagerTest { TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder(); builder.addToResources(namedPort("thrift")); - DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -293,10 +304,12 @@ public class ConfigurationManagerTest { false, false, false, + MIN_REQUIRED_INSTANCES, + MAX_SLA_DURATION_SECS, ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS), TaskTestUtil.TIER_MANAGER, TaskTestUtil.THRIFT_BACKFILL, - TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder)); + TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -312,15 +325,18 @@ public class ConfigurationManagerTest { new ConfigurationManagerSettings( ALL_CONTAINER_TYPES, true, - ImmutableList.of(new DockerParameter("foo", "bar")), + ImmutableList.of(new DockerParameter("foo", "bar")), false, false, false, false, + MIN_REQUIRED_INSTANCES, + MAX_SLA_DURATION_SECS, ".+"), TaskTestUtil.TIER_MANAGER, TaskTestUtil.THRIFT_BACKFILL, - TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder)); + TestExecutorSettings.THERMOS_EXECUTOR) + .validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -333,7 +349,7 @@ public class ConfigurationManagerTest { expectTaskDescriptionException(NO_CONTAINER_VOLUMES); - CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); } @Test @@ -343,7 +359,7 @@ public class ConfigurationManagerTest { builder.unsetTaskLinks(); ITaskConfig populated = - DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder), 100); assertEquals(ImmutableSet.of("health", "http"), populated.getTaskLinks().keySet()); } @@ -361,6 +377,8 @@ public class ConfigurationManagerTest { true, true, true, + MIN_REQUIRED_INSTANCES, + MAX_SLA_DURATION_SECS, "b.r"), TaskTestUtil.TIER_MANAGER, TaskTestUtil.THRIFT_BACKFILL, @@ -368,6 +386,166 @@ public class ConfigurationManagerTest { .validateAndPopulate(IJobConfiguration.build(jobConfiguration)); } + @Test + public void testCountSlaPolicyUnsupportedTier() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setTier(TaskTestUtil.DEV_TIER_NAME) + .setSlaPolicy(SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(MIN_REQUIRED_INSTANCES)))); + + expectTaskDescriptionException("Tier 'tier-dev' does not support SlaPolicy."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testCountSlaPolicyTooFewInstances() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(MIN_REQUIRED_INSTANCES)))); + + expectTaskDescriptionException( + "Job with fewer than 20 instances cannot have Percentage/Count SlaPolicy."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES - 1); + } + + @Test + public void testCountSlaPolicyCountTooHigh() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(MIN_REQUIRED_INSTANCES)))); + + expectTaskDescriptionException( + "Current CountSlaPolicy: count=20 will not allow any instances to be killed. " + + "Must be less than instanceCount=20."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testCountSlaPolicyDurationSecsTooHigh() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setDurationSecs(MAX_SLA_DURATION_SECS + 1)))); + + expectTaskDescriptionException("CountSlaPolicy: durationSecs=7201 must be less than " + + "cluster-wide maximum of 7200 secs."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testPercentageSlaPolicyUnsupportedTier() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setTier(TaskTestUtil.DEV_TIER_NAME) + .setSlaPolicy(SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(DEFAULT_SLA_PERCENTAGE)))); + + expectTaskDescriptionException("Tier 'tier-dev' does not support SlaPolicy."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testPercentageSlaPolicyTooFewInstances() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(DEFAULT_SLA_PERCENTAGE)))); + + expectTaskDescriptionException( + "Job with fewer than 20 instances cannot have Percentage/Count SlaPolicy."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES - 1); + } + + @Test + public void testPercentageSlaPolicyPercentageTooHigh() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(100)))); + + expectTaskDescriptionException( + "Current PercentageSlaPolicy: percentage=100.000000 will not allow any instances " + + "to be killed. Must be less than 95.000000."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testPercentageSlaPolicyDurationSecsTooHigh() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setDurationSecs(MAX_SLA_DURATION_SECS + 1)))); + + expectTaskDescriptionException("PercentageSlaPolicy: durationSecs=7201 must be less than " + + "cluster-wide maximum of 7200 secs."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testCoordinatorSlaPolicyUnsupportedTier() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setTier(TaskTestUtil.DEV_TIER_NAME) + .setSlaPolicy(SlaPolicy.coordinatorSlaPolicy( + new CoordinatorSlaPolicy() + .setCoordinatorUrl("http://localhost")))); + + expectTaskDescriptionException("Tier 'tier-dev' does not support SlaPolicy."); + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES); + } + + @Test + public void testCoordinatorSlaPolicyTooFewInstances() throws Exception { + ITaskConfig config = ITaskConfig.build( + CONFIG_WITH_CONTAINER + .newBuilder() + .setSlaPolicy(SlaPolicy.coordinatorSlaPolicy( + new CoordinatorSlaPolicy() + .setCoordinatorUrl("http://localhost")))); + + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate( + config, + MIN_REQUIRED_INSTANCES - 1); + } + private void expectTaskDescriptionException(String message) { expectedException.expect(TaskDescriptionException.class); expectedException.expectMessage(message); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java index 0fabb33..c9ad0cf 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java @@ -66,7 +66,7 @@ public class CronIT extends EasyMockTest { new JobConfiguration() .setCronSchedule(CRONTAB_ENTRY.toString()) .setKey(JOB_KEY.newBuilder()) - .setInstanceCount(2) + .setInstanceCount(20) .setOwner(IDENTITY) .setCronCollisionPolicy(KILL_EXISTING) .setTaskConfig(makeTaskConfig())); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java index b7dcf3a..e81c3fa 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java @@ -37,7 +37,7 @@ final class QuartzTestUtil { static final IJobConfiguration JOB = IJobConfiguration.build( new JobConfiguration() .setCronSchedule("* * * * SUN") - .setInstanceCount(10) + .setInstanceCount(20) .setOwner(new Identity().setUser("user")) .setKey(AURORA_JOB_KEY.newBuilder()) .setTaskConfig(TaskTestUtil.makeConfig(AURORA_JOB_KEY) http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java new file mode 100644 index 0000000..28c62a1 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java @@ -0,0 +1,422 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.maintenance; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; + +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.CountSlaPolicy; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.HostMaintenanceRequest; +import org.apache.aurora.gen.HostStatus; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.SlaPolicy; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.sla.SlaManager; +import org.apache.aurora.scheduler.state.PubsubTestUtil; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest; +import org.apache.aurora.scheduler.storage.entities.IHostStatus; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ISlaPolicy; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.v1.Protos; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINED; +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; +import static org.apache.aurora.gen.ScheduleStatus.KILLED; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class MaintenanceControllerImplTest extends EasyMockTest { + + private static final String HOST_A = "a"; + private static final Set<String> A = ImmutableSet.of(HOST_A); + private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder() + .setValue("offer-id") + .build(); + private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder() + .setValue("agent-id") + .build(); + private static final Protos.FrameworkID FRAMEWORK_ID = Protos.FrameworkID.newBuilder() + .setValue("framework-id") + .build(); + private static final Protos.URL AGENT_URL = Protos.URL.newBuilder() + .setAddress(Protos.Address.newBuilder() + .setHostname(HOST_A) + .setPort(5051)) + .setScheme("http") + .build(); + private static final Protos.Unavailability UNAVAILABILITY = Protos.Unavailability.newBuilder() + .setStart(Protos.TimeInfo.newBuilder() + .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS))) + .build(); + private static final SlaPolicy SLA_POLICY = SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy(95, 1800)); + + private static final Protos.InverseOffer INVERSE_OFFER = Protos.InverseOffer.newBuilder() + .setId(OFFER_ID) + .setAgentId(AGENT_ID) + .setUrl(AGENT_URL) + .setFrameworkId(FRAMEWORK_ID) + .setUnavailability(UNAVAILABILITY) + .build(); + + private static final SlaPolicy COUNT_SLA_POLICY = SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(2) + .setDurationSecs(1800) + ); + + private StorageTestUtil storageUtil; + private StateManager stateManager; + private SlaManager slaManager; + private MaintenanceController.MaintenanceControllerImpl maintenance; + private EventSink eventSink; + + @Before + public void setUp() throws Exception { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + stateManager = createMock(StateManager.class); + slaManager = createMock(SlaManager.class); + TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); + expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes(); + + Injector injector = Guice.createInjector( + new PubsubEventModule(), + new MaintenanceModule(new MaintenanceModule.Options()), + new AbstractModule() { + @Override + protected void configure() { + bind(Storage.class).toInstance(storageUtil.storage); + bind(StateManager.class).toInstance(stateManager); + bind(SlaManager.class).toInstance(slaManager); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(Executor.class).annotatedWith(AsyncExecutor.class) + .toInstance(MoreExecutors.directExecutor()); + bind(TaskEventBatchWorker.class).toInstance(batchWorker); + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith( + MaintenanceController.MaintenanceControllerImpl.PollingInterval.class) + .toInstance(new TimeAmount(1, Time.MINUTES)); + } + }); + maintenance = injector.getInstance(MaintenanceController.MaintenanceControllerImpl.class); + eventSink = PubsubTestUtil.startPubsub(injector); + } + + private static IScheduledTask makeTask(String host, String taskId) { + ScheduledTask builder = TaskTestUtil.addStateTransition( + TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB), + RUNNING, + 1000).newBuilder(); + builder.getAssignedTask().setSlaveHost(host); + return IScheduledTask.build(builder); + } + + @Test + public void testMaintenanceCycle() { + IScheduledTask task1 = makeTask(HOST_A, "taskA"); + IScheduledTask task2 = makeTask(HOST_A, "taskB"); + + expectMaintenanceModeChange(HOST_A, SCHEDULED); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); + expectTaskDraining(task1); + expectTaskDraining(task2); + expectMaintenanceModeChange(HOST_A, DRAINING); + IHostAttributes attributes = + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); + IHostMaintenanceRequest maintenanceRequest = + IHostMaintenanceRequest.build(new HostMaintenanceRequest() + .setHost(HOST_A) + .setCreatedTimestampMs(System.currentTimeMillis()) + .setDefaultSlaPolicy(SLA_POLICY)); + + storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( + anyObject(IHostMaintenanceRequest.class)); + expect(storageUtil.attributeStore.getHostAttributes(HOST_A)) + .andReturn(Optional.of(attributes)).times(2); + expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) + .andReturn(Optional.of(maintenanceRequest)).times(2); + + expect(storageUtil.attributeStore.getHostAttributes()).andReturn(ImmutableSet.of(attributes)); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task2)); + // TaskA is KILLED and therefore no longer active + expectFetchTasksByHost(HOST_A, ImmutableSet.of()); + expectMaintenanceModeChange(HOST_A, DRAINED); + expectMaintenanceModeChange(HOST_A, NONE); + storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A); + + control.replay(); + + assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); + assertStatus(HOST_A, DRAINING, maintenance.drain(A)); + assertStatus(HOST_A, DRAINING, maintenance.getStatus(A)); + eventSink.post( + TaskStateChange.transition( + IScheduledTask.build(task1.newBuilder().setStatus(KILLED)), RUNNING)); + eventSink.post( + TaskStateChange.transition( + IScheduledTask.build(task2.newBuilder().setStatus(KILLED)), RUNNING)); + assertStatus(HOST_A, NONE, maintenance.endMaintenance(A)); + } + + @Test + public void testUnknownHost() { + expect(storageUtil.attributeStore.getHostAttributes("b")) + .andReturn(Optional.empty()); + + control.replay(); + + assertEquals(ImmutableSet.of(), + maintenance.startMaintenance(ImmutableSet.of("b"))); + } + + @Test + public void testDrainEmptyHost() { + storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( + anyObject(IHostMaintenanceRequest.class)); + expectMaintenanceModeChange(HOST_A, SCHEDULED); + expectFetchTasksByHost(HOST_A, ImmutableSet.of()); + expectMaintenanceModeChange(HOST_A, DRAINED); + + control.replay(); + + assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); + assertStatus(HOST_A, DRAINED, maintenance.drain(A)); + } + + @Test + public void testEndEarly() { + expectMaintenanceModeChange(HOST_A, SCHEDULED); + expectMaintenanceModeChange(HOST_A, NONE); + expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of( + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(NONE)))); + storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A); + + control.replay(); + + assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); + + // End maintenance without DRAINING. + assertStatus(HOST_A, NONE, maintenance.endMaintenance(A)); + + // Make sure a later transition on the host does not cause any ill effects that could surface + // from stale internal state. + eventSink.post(TaskStateChange.transition( + IScheduledTask.build(makeTask(HOST_A, "taskA").newBuilder().setStatus(KILLED)), RUNNING)); + } + + @Test + public void testSlaDrain() { + storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( + anyObject(IHostMaintenanceRequest.class)); + expectMaintenanceModeChange(HOST_A, DRAINING); + + control.replay(); + + assertStatus( + HOST_A, + DRAINING, + maintenance.slaDrain(ImmutableSet.of(HOST_A), COUNT_SLA_POLICY, 1800)); + } + + @Test + public void testSlaDrainUnknownHost() { + storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( + anyObject(IHostMaintenanceRequest.class)); + expect(storageUtil.attributeStore.getHostAttributes("unknown")) + .andReturn(Optional.empty()); + + control.replay(); + + assertEquals(ImmutableSet.of(), + maintenance.slaDrain(ImmutableSet.of("unknown"), COUNT_SLA_POLICY, 1800)); + } + + @Test + public void testIteration() { + IScheduledTask task1 = makeTask(HOST_A, "taskA"); + IScheduledTask task2 = makeTask(HOST_A, "taskB"); + + IHostAttributes attributes = + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); + expect(storageUtil.attributeStore.getHostAttributes()) + .andReturn(ImmutableSet.of(attributes)); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); + IHostMaintenanceRequest maintenanceRequest = + IHostMaintenanceRequest.build(new HostMaintenanceRequest() + .setHost(HOST_A) + .setCreatedTimestampMs(System.currentTimeMillis()) + .setDefaultSlaPolicy(SLA_POLICY)); + expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) + .andReturn(Optional.of(maintenanceRequest)).times(2); + expectTaskDraining(task1); + expectTaskDraining(task2); + + control.replay(); + + maintenance.runForTest(); + } + + @Test + public void testIterationEmptyHost() { + IHostAttributes attributes = + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); + expect(storageUtil.attributeStore.getHostAttributes()) + .andReturn(ImmutableSet.of(attributes)); + expectFetchTasksByHost(HOST_A, ImmutableSet.of()); + expectMaintenanceModeChange(HOST_A, DRAINED); + + control.replay(); + + maintenance.runForTest(); + } + + @Test + public void testIterationMaintenanceTimeout() { + IScheduledTask task1 = makeTask(HOST_A, "taskA"); + IScheduledTask task2 = makeTask(HOST_A, "taskB"); + + IHostAttributes attributes = + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); + expect(storageUtil.attributeStore.getHostAttributes()) + .andReturn(ImmutableSet.of(attributes)); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); + IHostMaintenanceRequest maintenanceRequest = + IHostMaintenanceRequest.build(new HostMaintenanceRequest() + .setHost(HOST_A) + .setCreatedTimestampMs(0) + .setDefaultSlaPolicy(SLA_POLICY)); + expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) + .andReturn(Optional.of(maintenanceRequest)).times(2); + expectTaskDraining(task1, true); + expectTaskDraining(task2, true); + + control.replay(); + + maintenance.runForTest(); + } + + @Test + public void testIterationNoMaintenanceRequest() { + IScheduledTask task1 = makeTask(HOST_A, "taskA"); + IScheduledTask task2 = makeTask(HOST_A, "taskB"); + + IHostAttributes attributes = + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); + expect(storageUtil.attributeStore.getHostAttributes()) + .andReturn(ImmutableSet.of(attributes)); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); + expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) + .andReturn(Optional.empty()).times(2); + + control.replay(); + + maintenance.runForTest(); + } + + @Test + public void testGetMode() { + expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of( + IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)))); + expect(storageUtil.attributeStore.getHostAttributes("unknown")).andReturn(Optional.empty()); + + control.replay(); + + assertEquals(DRAINING, maintenance.getMode(HOST_A)); + assertEquals(NONE, maintenance.getMode("unknown")); + } + + @Test + public void testInverseOfferDrain() { + IScheduledTask task1 = makeTask(HOST_A, "taskA"); + storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( + anyObject(IHostMaintenanceRequest.class)); + expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1)); + expectTaskDraining(task1); + IHostMaintenanceRequest maintenanceRequest = + IHostMaintenanceRequest.build(new HostMaintenanceRequest() + .setHost(HOST_A) + .setCreatedTimestampMs(System.currentTimeMillis()) + .setDefaultSlaPolicy(SLA_POLICY)); + expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) + .andReturn(Optional.of(maintenanceRequest)).times(1); + + control.replay(); + maintenance.drainForInverseOffer(INVERSE_OFFER); + } + + private void expectTaskDraining(IScheduledTask task) { + expectTaskDraining(task, false); + } + + private void expectTaskDraining(IScheduledTask task, boolean force) { + slaManager.checkSlaThenAct( + eq(task), + eq(ISlaPolicy.build(SLA_POLICY)), + anyObject(Storage.MutateWork.class), + eq(force)); + } + + private void expectFetchTasksByHost(String hostName, Set<IScheduledTask> tasks) { + expect(storageUtil.taskStore.fetchTasks(Query.slaveScoped(hostName).active())).andReturn(tasks); + } + + private void expectMaintenanceModeChange(String hostName, MaintenanceMode mode) { + IHostAttributes attributes = IHostAttributes.build(new HostAttributes().setHost(hostName)); + + expect(storageUtil.attributeStore.getHostAttributes(hostName)) + .andReturn(Optional.of(attributes)); + IHostAttributes updated = IHostAttributes.build(attributes.newBuilder().setMode(mode)); + expect(storageUtil.attributeStore.saveHostAttributes(updated)).andReturn(true); + } + + private void assertStatus(String host, MaintenanceMode mode, Set<IHostStatus> statuses) { + assertEquals(ImmutableSet.of(IHostStatus.build(new HostStatus(host, mode))), statuses); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java index c6163bb..8a8c695 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java @@ -36,9 +36,9 @@ import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.maintenance.MaintenanceController; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
