Enable `Tasks` to specify their own custom maintenance SLA. `Tasks` can specify custom SLA requirements as part of their `TaskConfig`. One of the new features is the ability to specify an external coordinator that can ACK/NACK maintenance requests for tasks. This will be hugely beneficial for onboarding services that cannot satisfactorily specify SLA in terms of running instances.
Maintenance requests are driven from the Scheduler to improve management of nodes in the cluster. Testing Done: ./build-support/jenkins/build.sh ./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh Bugs closed: AURORA-1978 Reviewed at https://reviews.apache.org/r/66716/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f2acf53f Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f2acf53f Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f2acf53f Branch: refs/heads/master Commit: f2acf53ff0d0cbb7b0c62f021cc55c9e45b32f9c Parents: 34be631 Author: Santhosh Kumar Shanmugham <santhoshkuma...@gmail.com> Authored: Tue Jun 5 16:15:52 2018 -0700 Committer: Santhosh Kumar <sshanmug...@twitter.com> Committed: Tue Jun 5 16:15:52 2018 -0700 ---------------------------------------------------------------------- RELEASE-NOTES.md | 32 + .../thrift/org/apache/aurora/gen/api.thrift | 6 + docs/README.md | 1 + docs/features/sla-requirements.md | 181 +++ docs/operations/configuration.md | 55 +- docs/reference/configuration.md | 34 +- docs/reference/scheduler-configuration.md | 10 + .../vagrant/systemd/aurora-scheduler.service | 6 +- .../apache/aurora/scheduler/app/AppModule.java | 5 + .../aurora/scheduler/config/CliOptions.java | 2 + .../configuration/ConfigurationManager.java | 105 +- .../maintenance/MaintenanceController.java | 470 +++++++ .../maintenance/MaintenanceModule.java | 78 ++ .../scheduler/mesos/MesosCallbackHandler.java | 2 +- .../apache/aurora/scheduler/sla/SlaManager.java | 437 ++++++ .../apache/aurora/scheduler/sla/SlaModule.java | 57 +- .../scheduler/state/MaintenanceController.java | 294 ----- .../aurora/scheduler/state/StateModule.java | 12 - .../scheduler/thrift/ReadOnlySchedulerImpl.java | 10 +- .../thrift/SchedulerThriftInterface.java | 24 +- .../python/apache/aurora/admin/admin_util.py | 38 + .../apache/aurora/admin/host_maintenance.py | 20 + .../python/apache/aurora/admin/maintenance.py | 45 + .../python/apache/aurora/client/api/__init__.py | 12 + .../python/apache/aurora/client/cli/context.py | 16 + .../python/apache/aurora/client/cli/jobs.py | 7 + .../aurora/scheduler/app/SchedulerIT.java | 9 + .../aurora/scheduler/base/TaskTestUtil.java | 4 +- .../scheduler/config/CommandLineTest.java | 14 +- .../configuration/ConfigurationManagerTest.java | 212 ++- .../aurora/scheduler/cron/quartz/CronIT.java | 2 +- .../scheduler/cron/quartz/QuartzTestUtil.java | 2 +- .../MaintenanceControllerImplTest.java | 422 ++++++ .../mesos/MesosCallbackHandlerTest.java | 2 +- .../aurora/scheduler/sla/SlaManagerTest.java | 1239 ++++++++++++++++++ .../aurora/scheduler/sla/SlaModuleTest.java | 15 +- .../state/MaintenanceControllerImplTest.java | 264 ---- .../thrift/SchedulerThriftInterfaceTest.java | 58 +- .../aurora/scheduler/thrift/ThriftIT.java | 12 + .../thrift/aop/MockDecoratedThrift.java | 7 + .../apache/aurora/admin/test_maintenance.py | 81 +- src/test/python/apache/aurora/api_util.py | 3 + .../aurora/client/api/test_scheduler_client.py | 9 + .../python/apache/aurora/client/cli/test_add.py | 22 +- .../apache/aurora/client/cli/test_kill.py | 52 +- .../sh/org/apache/aurora/e2e/http_example.py | 2 +- .../apache/aurora/e2e/partition_aware.aurora | 14 + .../sh/org/apache/aurora/e2e/sla_coordinator.py | 60 + .../sh/org/apache/aurora/e2e/sla_policy.aurora | 61 + .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 135 +- ui/src/main/js/components/TaskConfigSummary.js | 42 + 51 files changed, 4058 insertions(+), 644 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 5e1f994..0ef75d6 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,3 +1,35 @@ +0.21.0 +====== + +### New/updated: +- Introduce ability for tasks to specify custom SLA requirements via the new `SlaPolicy` structs. + There are 3 different SLA Policies that are currently supported - `CountSlaPolicy`, + `PercentageSlaPolicy` and `CoordinatorSlaPolicy`. SLA policies based on count and percentage + express the required number of `RUNNING` instances as either a count or percentage in addition to + allowing the duration-window for which these requirements have to be satisfied. For applications + that need more control over how SLA is determined, a custom SLA calculator can be configured a.k.a + Coordinator. Any action that can affect SLA, will first check with the Coordinator before + proceeding. + + **IMPORTANT: The storage changes required for this feature will make scheduler + snapshot backwards incompatible. Scheduler will be unable to read snapshot if rolled back to + previous version. If rollback is absolutely necessary, perform the following steps:** + 1. Stop all host maintenance requests via `aurora_admin host_activate`. + 2. Ensure a new snapshot is created by running `aurora_admin scheduler_snapshot <cluster>` + 3. Rollback to previous version + + Note: The `Coordinator` interface required for the `CoordinatorSlaPolicy` is experimental at + this juncture and is bound to change in the future. + +### Deprecations and removals: + +- Deprecated the `aurora_admin host_drain` command used for maintenance. With this release the SLA + computations are moved to the scheduler and it is no longer required for the client to compute + SLAs and watch the drains. The scheduler persists any host maintenance request and performs + SLA-aware drain of the tasks, before marking the host as `DRAINED`. So maintenance requests + survive across scheduler fail-overs. Use the newly introduced `aurora_admin sla_host_drain` + to skip the SLA computations on the admin client. + 0.20.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index ff48000..54007c8 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -1244,6 +1244,12 @@ service AuroraAdmin extends AuroraSchedulerManager { /** Set the given hosts back into serving mode. */ Response endMaintenance(1: Hosts hosts) + /** + * Ask scheduler to put hosts into DRAINING mode and move scheduled tasks off of the hosts + * such that its SLA requirements are satisfied. Use defaultSlaPolicy if it is not set for a task. + **/ + Response slaDrainHosts(1: Hosts hosts, 2: SlaPolicy defaultSlaPolicy, 3: i64 timeoutSecs) + /** Start a storage snapshot and block until it completes. */ Response snapshot() http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/README.md ---------------------------------------------------------------------- diff --git a/docs/README.md b/docs/README.md index 166bf1c..8bf936d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -28,6 +28,7 @@ Description of important Aurora features. * [Services](features/services.md) * [Service Discovery](features/service-discovery.md) * [SLA Metrics](features/sla-metrics.md) + * [SLA Requirements](features/sla-requirements.md) * [Webhooks](features/webhooks.md) ## Operators http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/features/sla-requirements.md ---------------------------------------------------------------------- diff --git a/docs/features/sla-requirements.md b/docs/features/sla-requirements.md new file mode 100644 index 0000000..555b174 --- /dev/null +++ b/docs/features/sla-requirements.md @@ -0,0 +1,181 @@ +SLA Requirements +================ + +- [Overview](#overview) +- [Default SLA](#default-sla) +- [Custom SLA](#custom-sla) + - [Count-based](#count-based) + - [Percentage-based](#percentage-based) + - [Coordinator-based](#coordinator-based) + +## Overview + +Aurora guarantees SLA requirements for jobs. These requirements limit the impact of cluster-wide +maintenance operations on the jobs. For instance, when an operator upgrades +the OS on all the Mesos agent machines, the tasks scheduled on them needs to be drained. +By specifying the SLA requirements a job can make sure that it has enough instances to +continue operating safely without incurring downtime. + +> SLA is defined as minimum number of active tasks required for a job every duration window. +A task is active if it was in `RUNNING` state during the last duration window. + +There is a [default](#default-sla) SLA guarantee for +[preferred](../features/multitenancy.md#configuration-tiers) tier jobs and it is also possible to +specify [custom](#custom-sla) SLA requirements. + +## Default SLA + +Aurora guarantees a default SLA requirement for tasks in +[preferred](../features/multitenancy.md#configuration-tiers) tier. + +> 95% of tasks in a job will be `active` for every 30 mins. + + +## Custom SLA + +For jobs that require different SLA requirements, Aurora allows jobs to specify their own +SLA requirements via the `SlaPolicies`. There are 3 different ways to express SLA requirements. + +### [Count-based](../reference/configuration.md#countslapolicy-objects) + +For jobs that need a minimum `number` of instances to be running all the time, +[`CountSlaPolicy`](../reference/configuration.md#countslapolicy-objects) +provides the ability to express the minimum number of required active instances (i.e. number of +tasks that are `RUNNING` for at least `duration_secs`). For instance, if we have a +`replicated-service` that has 3 instances and needs at least 2 instances every 30 minutes to be +treated healthy, the SLA requirement can be expressed with a +[`CountSlaPolicy`](../reference/configuration.md#countslapolicy-objects) like below, + +```python +Job( + name = 'replicated-service', + role = 'www-data', + instances = 3, + sla_policy = CountSlaPolicy( + count = 2, + duration_secs = 1800 + ) + ... +) +``` + +### [Percentage-based](../reference/configuration.md#percentageslapolicy-objects) + +For jobs that need a minimum `percentage` of instances to be running all the time, +[`PercentageSlaPolicy`](../reference/configuration.md#percentageslapolicy-objects) provides the +ability to express the minimum percentage of required active instances (i.e. percentage of tasks +that are `RUNNING` for at least `duration_secs`). For instance, if we have a `webservice` that +has 10000 instances for handling peak load and cannot have more than 0.1% of the instances down +for every 1 hr, the SLA requirement can be expressed with a +[`PercentageSlaPolicy`](../reference/configuration.md#percentageslapolicy-objects) like below, + +```python +Job( + name = 'frontend-service', + role = 'www-data', + instances = 10000, + sla_policy = PercentageSlaPolicy( + percentage = 99.9, + duration_secs = 3600 + ) + ... +) +``` + +### [Coordinator-based](../reference/configuration.md#coordinatorslapolicy-objects) + +When none of the above methods are enough to describe the SLA requirements for a job, then the SLA +calculation can be off-loaded to a custom service called the `Coordinator`. The `Coordinator` needs +to expose an endpoint that will be called to check if removal of a task will affect the SLA +requirements for the job. This is useful to control the number of tasks that undergoes maintenance +at a time, without affected the SLA for the application. + +Consider the example, where we have a `storage-service` stores 2 replicas of an object. Each replica +is distributed across the instances, such that replicas are stored on different hosts. In addition +a consistent-hash is used for distributing the data across the instances. + +When an instance needs to be drained (say for host-maintenance), we have to make sure that at least 1 of +the 2 replicas remains available. In such a case, a `Coordinator` service can be used to maintain +the SLA guarantees required for the job. + +The job can be configured with a +[`CoordinatorSlaPolicy`](../reference/configuration.md#coordinatorslapolicy-objects) to specify the +coordinator endpoint and the field in the response JSON that indicates if the SLA will be affected +or not affected, when the task is removed. + +```python +Job( + name = 'storage-service', + role = 'www-data', + sla_policy = CoordinatorSlaPolicy( + coordinator_url = 'http://coordinator.example.com', + status_key = 'drain' + ) + ... +) +``` + + +#### Coordinator Interface [Experimental] + +When a [`CoordinatorSlaPolicy`](../reference/configuration.md#coordinatorslapolicy-objects) is +specified for a job, any action that requires removing a task +(such as drains) will be required to get approval from the `Coordinator` before proceeding. The +coordinator service needs to expose a HTTP endpoint, that can take a `task-key` param +(`<cluster>/<role>/<env>/<name>/<instance>`) and a json body describing the task +details and return a response json that will contain the boolean status for allowing or disallowing +the task's removal. + +##### Request: +```javascript +POST / + ?task=<cluster>/<role>/<env>/<name>/<instance> + +{ + "assignedTask": { + "taskId": "taskA", + "slaveHost": "a", + "task": { + "job": { + "role": "role", + "environment": "devel", + "name": "job" + }, + ... + }, + "assignedPorts": { + "http": 1000 + }, + "instanceId": 1 + ... + }, + ... +} +``` + +##### Response: +```json +{ + "drain": true +} +``` + +If Coordinator allows removal of the task, then the taskâs +[termination lifecycle](../reference/configuration.md#httplifecycleconfig-objects) +is triggered. If Coordinator does not allow removal, then the request will be retried again in the +future. + +#### Coordinator Actions + +Coordinator endpoint get its own lock and this is used to serializes calls to the Coordinator. +It guarantees that only one concurrent request is sent to a coordinator endpoint. This allows +coordinators to simply look the current state of the tasks to determine its SLA (without having +to worry about in-flight and pending requests). However if there are multiple coordinators, +maintenance can be done in parallel across all the coordinators. + +_Note: Single concurrent request to a coordinator endpoint does not translate as exactly-once +guarantee. The coordinator must be able to handle duplicate drain +requests for the same task._ + + + http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/operations/configuration.md ---------------------------------------------------------------------- diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md index 85a6fab..4890c09 100644 --- a/docs/operations/configuration.md +++ b/docs/operations/configuration.md @@ -312,19 +312,19 @@ increased). To enable this in the Scheduler, you can set the following options: - --enable_update_affinity=true - --update_affinity_reservation_hold_time=3mins + -enable_update_affinity=true + -update_affinity_reservation_hold_time=3mins You will need to tune the hold time to match the behavior you see in your cluster. If you have extremely high update throughput, you might have to extend it as processing updates could easily add significant delays between scheduling attempts. You may also have to tune scheduling parameters to achieve the throughput you need in your cluster. Some relevant settings (with defaults) are: - --max_schedule_attempts_per_sec=40 - --initial_schedule_penalty=1secs - --max_schedule_penalty=1mins - --scheduling_max_batch_size=3 - --max_tasks_per_schedule_attempt=5 + -max_schedule_attempts_per_sec=40 + -initial_schedule_penalty=1secs + -max_schedule_penalty=1mins + -scheduling_max_batch_size=3 + -max_tasks_per_schedule_attempt=5 There are metrics exposed by the Scheduler which can provide guidance on where the bottleneck is. Example metrics to look at: @@ -337,3 +337,44 @@ Example metrics to look at: Most likely you'll run into limits with the number of update instances that can be processed per minute before you run into any other limits. So if your total work done per minute starts to exceed 2k instances, you may need to extend the update_affinity_reservation_hold_time. + +## Cluster Maintenance + +Aurora performs maintenance related task drains. One of the scheduler options that can control +how often the scheduler polls for maintenance work can be controlled via, + + -host_maintenance_polling_interval=1min + +## Enforcing SLA limitations + +Since tasks can specify their own `SLAPolicy`, the cluster needs to limit these SLA requirements. +Too aggressive a requirement can permanently block any type of maintenance work +(ex: OS/Kernel/Security upgrades) on a host and hold it hostage. + +An operator can control the limits for SLA requirements via these scheduler configuration options: + + -max_sla_duration_secs=2hrs + -min_required_instances_for_sla_check=20 + +_Note: These limits only apply for `CountSlaPolicy` and `PercentageSlaPolicy`._ + +### Limiting Coordinator SLA + +With `CoordinatorSlaPolicy` the SLA calculation is off-loaded to an external HTTP service. Some +relevant scheduler configuration options are, + + -sla_coordinator_timeout=1min + -max_parallel_coordinated_maintenance=10 + +Since handing off the SLA calculation to an external service can potentially block maintenance +on hosts for an indefinite amount of time (either due to a mis-configured coordinator or due to +a valid degraded service). In those situations the following metrics will be helpful to identify the +offending tasks. + + sla_coordinator_user_errors_* (counter tracking number of times the coordinator for the task + returned a bad response.) + sla_coordinator_errors_* (counter tracking number of times the scheduler was not able + to communicate with the coordinator of the task.) + sla_coordinator_lock_starvation_* (counter tracking number of times the scheduler was not able to + get the lock for the coordinator of the task.) + http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/reference/configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index d4b869b..acab4c5 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -23,6 +23,7 @@ configuration design. - [Announcer Objects](#announcer-objects) - [Container Objects](#container) - [LifecycleConfig Objects](#lifecycleconfig-objects) + - [SlaPolicy Objects](#slapolicy-objects) - [Specifying Scheduling Constraints](#specifying-scheduling-constraints) - [Template Namespaces](#template-namespaces) - [mesos Namespace](#mesos-namespace) @@ -343,7 +344,7 @@ Job Schema ```contact``` | String | Best email address to reach the owner of the job. For production jobs, this is usually a team mailing list. ```instances```| Integer | Number of instances (sometimes referred to as replicas or shards) of the task to create. (Default: 1) ```cron_schedule``` | String | Cron schedule in cron format. May only be used with non-service jobs. See [Cron Jobs](../features/cron-jobs.md) for more information. Default: None (not a cron job.) - ```cron_collision_policy``` | String | Policy to use when a cron job is triggered while a previous run is still active. KILL_EXISTING Kill the previous run, and schedule the new run CANCEL_NEW Let the previous run continue, and cancel the new run. (Default: KILL_EXISTING) + ```cron_collision_policy``` | String | Policy to use when a cron job is triggered while a previous run is still active. KILL\_EXISTING Kill the previous run, and schedule the new run CANCEL\_NEW Let the previous run continue, and cancel the new run. (Default: KILL_EXISTING) ```update_config``` | ```UpdateConfig``` object | Parameters for controlling the rate and policy of rolling updates. ```constraints``` | dict | Scheduling constraints for the tasks. See the section on the [constraint specification language](#specifying-scheduling-constraints) ```service``` | Boolean | If True, restart tasks regardless of success or failure. (Default: False) @@ -359,6 +360,7 @@ Job Schema ```partition_policy``` | ```PartitionPolicy``` object | An optional partition policy that allows job owners to define how to handle partitions for running tasks (in partition-aware Aurora clusters) ```metadata``` | list of ```Metadata``` objects | list of ```Metadata``` objects for user's customized metadata information. ```executor_config``` | ```ExecutorConfig``` object | Allows choosing an alternative executor defined in `custom_executor_config` to be used instead of Thermos. Tasks will be launched with Thermos as the executor by default. See [Custom Executors](../features/custom-executors.md) for more info. + ```sla_policy``` | Choice of ```CountSlaPolicy```, ```PercentageSlaPolicy``` or ```CoordinatorSlaPolicy``` object | An optional SLA policy that allows job owners to describe the SLA requirements for the job. See [SlaPolicy Objects](#slapolicy-objects) for more information. ### UpdateConfig Objects @@ -564,7 +566,7 @@ See [Docker Command Line Reference](https://docs.docker.com/reference/commandlin ```graceful_shutdown_wait_secs``` | Integer | The amount of time (in seconds) to wait after hitting the ```graceful_shutdown_endpoint``` before proceeding with the [task termination lifecycle](https://aurora.apache.org/documentation/latest/reference/task-lifecycle/#forceful-termination-killing-restarting). (Default: 5) ```shutdown_wait_secs``` | Integer | The amount of time (in seconds) to wait after hitting the ```shutdown_endpoint``` before proceeding with the [task termination lifecycle](https://aurora.apache.org/documentation/latest/reference/task-lifecycle/#forceful-termination-killing-restarting). (Default: 5) -#### graceful_shutdown_endpoint +#### graceful\_shutdown\_endpoint If the Job is listening on the port as specified by the HttpLifecycleConfig (default: `health`), a HTTP POST request will be sent over localhost to this @@ -581,6 +583,34 @@ does not shut down on its own after `shutdown_wait_secs` seconds, it will be forcefully killed. +### SlaPolicy Objects + +Configuration for specifying custom [SLA requirements](../features/sla-requirements.md) for a job. There are 3 supported SLA policies +namely, [`CountSlaPolicy`](#countslapolicy-objects), [`PercentageSlaPolicy`](#percentageslapolicy-objects) and [`CoordinatorSlaPolicy`](#coordinatorslapolicy-objects). + + +### CountSlaPolicy Objects + + param | type | description + ----- | :----: | ----------- + ```count``` | Integer | The number of active instances required every `durationSecs`. + ```duration_secs``` | Integer | Minimum time duration a task needs to be `RUNNING` to be treated as active. + +### PercentageSlaPolicy Objects + + param | type | description + ----- | :----: | ----------- + ```percentage``` | Float | The percentage of active instances required every `durationSecs`. + ```duration_secs``` | Integer | Minimum time duration a task needs to be `RUNNING` to be treated as active. + +### CoordinatorSlaPolicy Objects + + param | type | description + ----- | :----: | ----------- + ```coordinator_url``` | String | The URL to the [Coordinator](../features/sla-requirements.md#coordinator) service to be contacted before performing SLA affecting actions (job updates, host drains etc). + ```status_key``` | String | The field in the Coordinator response that indicates the SLA status for working on the task. (Default: `drain`) + + Specifying Scheduling Constraints ================================= http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/docs/reference/scheduler-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md index a659cfa..805e516 100644 --- a/docs/reference/scheduler-configuration.md +++ b/docs/reference/scheduler-configuration.md @@ -106,6 +106,8 @@ Optional flags: Minimum guaranteed time for task history retention before any pruning is attempted. -history_prune_threshold (default (2, days)) Time after which the scheduler will prune terminated task history. +-host_maintenance_polling_interval (default (1, minute)) + Interval between polling for pending host maintenance requests. -hostname The hostname to advertise in ZooKeeper instead of the locally-resolved hostname. -http_authentication_mechanism (default NONE) @@ -134,6 +136,8 @@ Optional flags: Maximum delay between attempts to schedule a flapping task. -max_leading_duration (default (1, days)) After leading for this duration, the scheduler should commit suicide. +-max_parallel_coordinated_maintenance (default 10) + Maximum number of coordinators that can be contacted in parallel. -max_registration_delay (default (1, mins)) Max allowable delay to allow the driver to register before aborting -max_reschedule_task_delay_on_startup (default (30, secs)) @@ -144,6 +148,8 @@ Optional flags: Maximum number of scheduling attempts to make per second. -max_schedule_penalty (default (1, mins)) Maximum delay between attempts to schedule a PENDING tasks. +-max_sla_duration_secs (default (2, hrs)) + Maximum duration window for which SLA requirements are to be satisfied. This does not apply to jobs that have a CoordinatorSlaPolicy. -max_status_update_batch_size (default 1000) [must be > 0] The maximum number of status updates that can be processed in a batch. -max_task_event_batch_size (default 300) [must be > 0] @@ -156,6 +162,8 @@ Optional flags: Upper limit on the number of failures allowed during a job update. This helps cap potentially unbounded entries into storage. -min_offer_hold_time (default (5, mins)) Minimum amount of time to hold a resource offer before declining. +-min_required_instances_for_sla_check (default 20) + Minimum number of instances required for a job to be eligible for SLA check. This does not apply to jobs that have a CoordinatorSlaPolicy. -native_log_election_retries (default 20) The maximum number of attempts to obtain a new log writer. -native_log_election_timeout (default (15, secs)) @@ -214,6 +222,8 @@ Optional flags: Path to shiro.ini for authentication and authorization configuration. -shiro_realm_modules (default [class org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule]) Guice modules for configuring Shiro Realms. +-sla_coordinator_timeout (default (1, min)) [must be > 0] + Timeout interval for communicating with Coordinator. -sla_non_prod_metrics (default []) Metric categories collected for non production tasks. -sla_prod_metrics (default [JOB_UPTIMES, PLATFORM_UPTIME, MEDIANS]) http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/examples/vagrant/systemd/aurora-scheduler.service ---------------------------------------------------------------------- diff --git a/examples/vagrant/systemd/aurora-scheduler.service b/examples/vagrant/systemd/aurora-scheduler.service index 57e4bba..fa76ad0 100644 --- a/examples/vagrant/systemd/aurora-scheduler.service +++ b/examples/vagrant/systemd/aurora-scheduler.service @@ -31,6 +31,8 @@ Environment="JAVA_OPTS='-Djava.library.path=/usr/lib \ -Xdebug \ -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005'" WorkingDirectory=/home/vagrant/aurora/dist/install/aurora-scheduler + +# min_offer_hold_time is set to 1mins to shorten time to update attributes on offers. AURORA-1985. ExecStart=/home/vagrant/aurora/dist/install/aurora-scheduler/bin/aurora-scheduler \ -cluster_name=devcluster \ -hostname=aurora.local \ @@ -58,7 +60,9 @@ ExecStart=/home/vagrant/aurora/dist/install/aurora-scheduler/bin/aurora-schedule -allow_container_volumes=true \ -offer_filter_duration=0secs \ -mesos_driver=V1_DRIVER \ - -unavailability_threshold=1mins + -unavailability_threshold=1mins \ + -min_required_instances_for_sla_check=2 \ + -min_offer_hold_time=1mins User=root Group=root Restart=always http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/app/AppModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java index ffc0744..edc13d4 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java @@ -26,6 +26,7 @@ import com.google.inject.AbstractModule; import org.apache.aurora.GuiceUtils; import org.apache.aurora.common.inject.TimedInterceptor; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.Clock; @@ -46,6 +47,7 @@ import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.http.JettyServerModule; +import org.apache.aurora.scheduler.maintenance.MaintenanceModule; import org.apache.aurora.scheduler.mesos.SchedulerDriverModule; import org.apache.aurora.scheduler.metadata.MetadataModule; import org.apache.aurora.scheduler.offers.OfferManagerModule; @@ -146,6 +148,8 @@ public class AppModule extends AbstractModule { opts.main.allowGpuResource, opts.app.enableMesosFetcher, opts.app.allowContainerVolumes, + opts.sla.minRequiredInstances, + opts.sla.maxSlaDuration.as(Time.SECONDS), opts.app.allowedJobEnvironments), opts.main.driverImpl, opts); @@ -190,6 +194,7 @@ public class AppModule extends AbstractModule { install(new StateModule(options)); install(new SlaModule(options.sla)); install(new UpdaterModule(options.updater)); + install(new MaintenanceModule(options.maintenance)); bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java index a2fb039..2dbd535 100644 --- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java +++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java @@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule; import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule; import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule; import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; +import org.apache.aurora.scheduler.maintenance.MaintenanceModule; import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule; import org.apache.aurora.scheduler.offers.OfferManagerModule; import org.apache.aurora.scheduler.preemptor.PreemptorModule; @@ -86,6 +87,7 @@ public class CliOptions { public final StatsModule.Options stats = new StatsModule.Options(); public final CronModule.Options cron = new CronModule.Options(); public final ResourceSettings resourceSettings = new ResourceSettings(); + public final MaintenanceModule.Options maintenance = new MaintenanceModule.Options(); final List<Object> custom; public CliOptions() { http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java index 4073229..2fd1746 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Optional; import java.util.regex.Pattern; import java.util.stream.Collectors; - import javax.annotation.Nullable; import javax.inject.Inject; @@ -30,8 +29,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.aurora.gen.Container; +import org.apache.aurora.gen.CountSlaPolicy; import org.apache.aurora.gen.DockerParameter; import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.SlaPolicy; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.TierManager; @@ -99,6 +101,8 @@ public class ConfigurationManager { private final boolean enableMesosFetcher; private final boolean allowContainerVolumes; private final Pattern allowedJobEnvironments; + private final int minRequiredInstances; + private final long maxSlaDurationSecs; public ConfigurationManagerSettings( ImmutableSet<Container._Fields> allowedContainerTypes, @@ -108,6 +112,8 @@ public class ConfigurationManager { boolean allowGpuResource, boolean enableMesosFetcher, boolean allowContainerVolumes, + int minRequiredInstances, + long maxSlaDurationSecs, String allowedJobEnvironment) { this.allowedContainerTypes = requireNonNull(allowedContainerTypes); @@ -118,6 +124,8 @@ public class ConfigurationManager { this.enableMesosFetcher = enableMesosFetcher; this.allowContainerVolumes = allowContainerVolumes; this.allowedJobEnvironments = Pattern.compile(requireNonNull(allowedJobEnvironment)); + this.minRequiredInstances = minRequiredInstances; + this.maxSlaDurationSecs = maxSlaDurationSecs; } } @@ -196,7 +204,9 @@ public class ConfigurationManager { } builder.setTaskConfig( - validateAndPopulate(ITaskConfig.build(builder.getTaskConfig())).newBuilder()); + validateAndPopulate( + ITaskConfig.build(builder.getTaskConfig()), + builder.getInstanceCount()).newBuilder()); // Only one of [service=true, cron_schedule] may be set. if (!Strings.isNullOrEmpty(job.getCronSchedule()) && builder.getTaskConfig().isIsService()) { @@ -207,6 +217,89 @@ public class ConfigurationManager { return IJobConfiguration.build(builder); } + /** + * Validates the {@link TaskConfig}'s {@link SlaPolicy}, if any. + * + * A valid {@link SlaPolicy} is one that allows at least 1 instance to be drained for a job. + * + * @param config {@link TaskConfig} to be validated. + * @param instanceCount number of instances in the job. + * @throws TaskDescriptionException thrown when {@link SlaPolicy} is not valid. + */ + private void validateSlaPolicy( + TaskConfig config, + int instanceCount) throws TaskDescriptionException { + + if (config.isSetSlaPolicy()) { + if (tierManager.getTier(ITaskConfig.build(config)).isRevocable() + || tierManager.getTier(ITaskConfig.build(config)).isPreemptible()) { + throw new TaskDescriptionException(String.format( + "Tier '%s' does not support SlaPolicy.", + config.getTier())); + } + + SlaPolicy slaPolicy = config.getSlaPolicy(); + if (!slaPolicy.isSetCoordinatorSlaPolicy() + && instanceCount < settings.minRequiredInstances) { + throw new TaskDescriptionException(String.format( + "Job with fewer than %d instances cannot have Percentage/Count SlaPolicy.", + settings.minRequiredInstances)); + } + + if (slaPolicy.isSetCountSlaPolicy()) { + validateCountSlaPolicy(instanceCount, slaPolicy.getCountSlaPolicy()); + } + + if (slaPolicy.isSetPercentageSlaPolicy()) { + validatePercentageSlaPolicy(instanceCount, slaPolicy.getPercentageSlaPolicy()); + } + } + } + + private void validatePercentageSlaPolicy( + int instanceCount, + PercentageSlaPolicy slaPolicy) throws TaskDescriptionException { + if (slaPolicy.isSetPercentage()) { + double percentage = slaPolicy.getPercentage() / 100.0; + if (instanceCount - instanceCount * percentage < 1) { + throw new TaskDescriptionException(String.format( + "Current PercentageSlaPolicy: percentage=%f will not allow any instances to be killed. " + + "Must be less than %f.", + slaPolicy.getPercentage(), + ((double) (instanceCount - 1)) / instanceCount * 100.0)); + } + } + + if (slaPolicy.isSetDurationSecs() + && slaPolicy.getDurationSecs() > settings.maxSlaDurationSecs) { + throw new TaskDescriptionException(String.format( + "PercentageSlaPolicy: durationSecs=%d must be less than cluster-wide maximum of %d secs.", + slaPolicy.getDurationSecs(), + settings.maxSlaDurationSecs)); + } + } + + private void validateCountSlaPolicy( + int instanceCount, + CountSlaPolicy slaPolicy) throws TaskDescriptionException { + if (slaPolicy.isSetCount() + && instanceCount - slaPolicy.getCount() < 1) { + throw new TaskDescriptionException(String.format( + "Current CountSlaPolicy: count=%d will not allow any instances to be killed. " + + "Must be less than instanceCount=%d.", + slaPolicy.getCount(), + instanceCount)); + } + + if (slaPolicy.isSetDurationSecs() + && slaPolicy.getDurationSecs() > settings.maxSlaDurationSecs) { + throw new TaskDescriptionException(String.format( + "CountSlaPolicy: durationSecs=%d must be less than cluster-wide maximum of %d secs.", + slaPolicy.getDurationSecs(), + settings.maxSlaDurationSecs)); + } + } + @VisibleForTesting static final String NO_DOCKER_PARAMETERS = "This scheduler is configured to disallow Docker parameters."; @@ -236,10 +329,14 @@ public class ConfigurationManager { * * * @param config Task config to validate and populate. + * @param instanceCount The number of instances in the job. * @return A reference to the modified {@code config} (for chaining). * @throws TaskDescriptionException If the task is invalid. */ - public ITaskConfig validateAndPopulate(ITaskConfig config) throws TaskDescriptionException { + public ITaskConfig validateAndPopulate( + ITaskConfig config, + int instanceCount) throws TaskDescriptionException { + TaskConfig builder = config.newBuilder(); if (config.isSetTier() && !UserProvidedStrings.isGoodIdentifier(config.getTier())) { @@ -374,6 +471,8 @@ public class ConfigurationManager { } } + validateSlaPolicy(builder, instanceCount); + maybeFillLinks(builder); return ITaskConfig.build(builder); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java new file mode 100644 index 0000000..dd2462d --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java @@ -0,0 +1,470 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.maintenance; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractScheduledService; + +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.gen.HostMaintenanceRequest; +import org.apache.aurora.gen.HostStatus; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.SlaPolicy; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.sla.SlaManager; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest; +import org.apache.aurora.scheduler.storage.entities.IHostStatus; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ISlaPolicy; +import org.apache.mesos.v1.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +import static com.google.common.base.Preconditions.checkNotNull; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINED; +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; + +/** + * Logic that puts hosts into maintenance mode, and triggers draining of hosts upon request. + * All state-changing functions return their results. Additionally, all state-changing functions + * will ignore requests to change state of unknown hosts and subsequently omit these hosts from + * return values. + */ +public interface MaintenanceController { + + /** + * Places hosts in maintenance mode. + * Hosts in maintenance mode are less-preferred for scheduling. + * No change will be made for hosts that are not recognized, and unrecognized hosts will not be + * included in the result. + * + * @param hosts Hosts to put into maintenance mode. + * @return The adjusted state of the hosts. + */ + Set<IHostStatus> startMaintenance(Set<String> hosts); + + /** + * Initiate a drain of all active tasks on {@code hosts}. + * + * @param hosts Hosts to drain. + * @return The adjusted state of the hosts. Hosts without any active tasks will be immediately + * moved to DRAINED. + */ + Set<IHostStatus> drain(Set<String> hosts); + + /** + * Initiate an SLA-aware drain of all active tasks on {@code hosts}. + * + * @param hosts Hosts to drain. + * @param defaultSlaPolicy SlaPolicy to use if a task does not have an SlaPolicy. + * @param timeoutSecs Interval after which tasks will be forcefully drained without checking SLA. + * @return The adjusted state of the hosts. Hosts without any active tasks will be immediately + * moved to DRAINED. + */ + Set<IHostStatus> slaDrain(Set<String> hosts, SlaPolicy defaultSlaPolicy, long timeoutSecs); + + /** + * Drain tasks defined by the inverse offer. + * This method doesn't set any host attributes. + * + * @param inverseOffer the inverse offer to use. + */ + void drainForInverseOffer(Protos.InverseOffer inverseOffer); + + /** + * Fetches the current maintenance mode of {$code host}. + * + * @param host Host to fetch state for. + * @return Maintenance mode of host, {@link MaintenanceMode#NONE} if the host is not known. + */ + MaintenanceMode getMode(String host); + + /** + * Fetches the current state of {@code hosts}. + * + * @param hosts Hosts to fetch state for. + * @return The state of the hosts. + */ + Set<IHostStatus> getStatus(Set<String> hosts); + + /** + * Moves {@code hosts} out of maintenance mode, returning them to mode NONE. + * + * @param hosts Hosts to move out of maintenance mode. + * @return The adjusted state of the hosts. + */ + Set<IHostStatus> endMaintenance(Set<String> hosts); + + /** + * Records the maintenance requests for hosts and drains any active tasks from the host + * asynchronously. + * + * Tasks are drained iff it will satisfy the required SLA for the task. Task's SLA is either the + * {@link SlaPolicy} configured as part of the TaskConfig or the default {@link SlaPolicy} + * specified as part of the maintenance request. If neither then the task is drained immediately. + * + * In order to avoid tasks from blocking maintenance perpetually each maintenance request has a + * timeout after which all tasks forcefully drained. + */ + class MaintenanceControllerImpl + extends AbstractScheduledService implements MaintenanceController, EventSubscriber { + private static final Logger LOG = LoggerFactory.getLogger(MaintenanceControllerImpl.class); + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface PollingInterval { } + + @VisibleForTesting + static final String DRAINING_MESSAGE = "Draining machine for maintenance."; + + private static final String MISSING_MAINTENANCE_REQUEST = "missing_maintenance_request"; + private static final SlaPolicy ZERO_PERCENT_SLA = SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(0) + .setDurationSecs(0)); + + private final Storage storage; + private final Amount<Long, Time> pollingInterval; + private final TaskEventBatchWorker batchWorker; + private final SlaManager slaManager; + private final StateManager stateManager; + + private final AtomicLong missingMaintenanceCounter; + + @Inject + public MaintenanceControllerImpl( + Storage storage, + @PollingInterval Amount<Long, Time> pollingInterval, + TaskEventBatchWorker batchWorker, + SlaManager slaManager, + StateManager stateManager, + StatsProvider statsProvider) { + + this.storage = requireNonNull(storage); + this.pollingInterval = checkNotNull(pollingInterval); + this.batchWorker = requireNonNull(batchWorker); + this.slaManager = requireNonNull(slaManager); + this.stateManager = requireNonNull(stateManager); + this.missingMaintenanceCounter = statsProvider.makeCounter(MISSING_MAINTENANCE_REQUEST); + } + + private Set<String> drainTasksOnHost(String host, StoreProvider store) { + Query.Builder query = Query.slaveScoped(host).active(); + + List<IScheduledTask> candidates = new ArrayList<>(store.getTaskStore().fetchTasks(query)); + + if (candidates.isEmpty()) { + LOG.info("No tasks to drain on host: {}", host); + return Collections.emptySet(); + } + + // shuffle the candidates to avoid head-of-line blocking + Collections.shuffle(candidates); + candidates.forEach(task -> drainTask(task, store)); + + return candidates.stream().map(Tasks::id).collect(Collectors.toSet()); + } + + private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) { + LOG.info("Hosts to drain: " + hosts); + Set<String> emptyHosts = Sets.newHashSet(); + for (String host : hosts) { + Set<String> drainedTasks = drainTasksOnHost(host, store); + // If there are no tasks on the host, immediately transition to DRAINED. + if (drainedTasks.isEmpty()) { + emptyHosts.add(host); + } + } + + return ImmutableSet.<IHostStatus>builder() + .addAll(setMaintenanceMode(store, emptyHosts, DRAINED)) + .addAll(setMaintenanceMode(store, Sets.difference(hosts, emptyHosts), DRAINING)) + .build(); + } + + /** + * Notifies the MaintenanceController that a task has changed state. + * + * @param change Event + */ + @Subscribe + public void taskChangedState(final TaskStateChange change) { + if (Tasks.isTerminated(change.getNewState())) { + final String host = change.getTask().getAssignedTask().getSlaveHost(); + batchWorker.execute(store -> { + // If the task _was_ associated with a draining host, and it was the last task on the + // host. + Optional<IHostAttributes> attributes = + store.getAttributeStore().getHostAttributes(host); + if (attributes.isPresent() && attributes.get().getMode() == DRAINING) { + Query.Builder builder = Query.slaveScoped(host).active(); + Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder); + if (Iterables.isEmpty(activeTasks)) { + LOG.info("Moving host {} into DRAINED", host); + setMaintenanceMode(store, ImmutableSet.of(host), DRAINED); + } else { + LOG.info("Host {} is DRAINING with active tasks: {}", host, Tasks.ids(activeTasks)); + } + } + return BatchWorker.NO_RESULT; + }); + } + } + + @Override + public Set<IHostStatus> startMaintenance(Set<String> hosts) { + return storage.write( + storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED)); + } + + private void recordMaintenanceRequests( + MutableStoreProvider store, + Set<String> hosts, + SlaPolicy defaultSlaPolicy, + long timeoutSecs) { + + hosts.forEach( + host -> store.getHostMaintenanceStore().saveHostMaintenanceRequest( + IHostMaintenanceRequest.build( + new HostMaintenanceRequest() + .setHost(host) + .setDefaultSlaPolicy(defaultSlaPolicy) + .setTimeoutSecs(timeoutSecs) + .setCreatedTimestampMs(System.currentTimeMillis())))); + } + + @Override + public Set<IHostStatus> drain(Set<String> hosts) { + return storage.write(store -> { + // Create a dummy maintenance request zero percent sla and timeout to force drain. + recordMaintenanceRequests(store, hosts, ZERO_PERCENT_SLA, 0); + return watchDrainingTasks(store, hosts); + }); + } + + @Override + public Set<IHostStatus> slaDrain( + Set<String> hosts, + SlaPolicy defaultSlaPolicy, + long timeoutSecs) { + + // We can have only one maintenance request per host at any time. + // So we will simply overwrite any existing request. If the current one is actively handled, + // during the write, the new one will just be a no-op, since the host is already being + // drained. If host is in DRAINED it will be moved back into DRAINING and then back into + // DRAINED without having to perform any work. + return storage.write(store -> { + recordMaintenanceRequests(store, hosts, defaultSlaPolicy, timeoutSecs); + return setMaintenanceMode(store, hosts, DRAINING); + }); + } + + private Optional<String> getHostname(Protos.InverseOffer offer) { + if (offer.getUrl().getAddress().hasHostname()) { + return Optional.of(offer.getUrl().getAddress().getHostname()); + } else { + return Optional.empty(); + } + } + + @Override + public void drainForInverseOffer(Protos.InverseOffer offer) { + // TaskStore does not allow for querying by agent id. + Optional<String> hostname = getHostname(offer); + + if (hostname.isPresent()) { + String host = hostname.get(); + storage.write(storeProvider -> { + // Create a dummy maintenance request zero percent sla and timeout to force drain. + recordMaintenanceRequests(storeProvider, ImmutableSet.of(host), ZERO_PERCENT_SLA, 0); + return drainTasksOnHost(host, storeProvider); + }); + } else { + LOG.error("Unable to drain tasks on agent {} because " + + "no hostname attached to inverse offer {}.", offer.getAgentId(), offer.getId()); + } + } + + private static final Function<IHostAttributes, String> HOST_NAME = + IHostAttributes::getHost; + + private static final Function<IHostAttributes, IHostStatus> ATTRS_TO_STATUS = + attributes -> IHostStatus.build( + new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode())); + + private static final Function<IHostStatus, MaintenanceMode> GET_MODE = IHostStatus::getMode; + + @Override + public MaintenanceMode getMode(final String host) { + return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host) + .map(ATTRS_TO_STATUS) + .map(GET_MODE) + .orElse(MaintenanceMode.NONE)); + } + + @Override + public Set<IHostStatus> getStatus(final Set<String> hosts) { + return storage.read(storeProvider -> { + // Warning - this is filtering _all_ host attributes. If using this to frequently query + // for a small set of hosts, a getHostAttributes variant should be added. + return storeProvider.getAttributeStore().getHostAttributes().stream() + .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME)) + .map(ATTRS_TO_STATUS) + .collect(Collectors.toSet()); + }); + } + + @Override + public Set<IHostStatus> endMaintenance(final Set<String> hosts) { + return storage.write( + storeProvider -> { + hosts.forEach( + h -> storeProvider.getHostMaintenanceStore().removeHostMaintenanceRequest(h)); + return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE); + }); + } + + private Set<IHostStatus> setMaintenanceMode( + MutableStoreProvider storeProvider, + Set<String> hosts, + MaintenanceMode mode) { + + AttributeStore.Mutable store = storeProvider.getAttributeStore(); + ImmutableSet.Builder<IHostStatus> statuses = ImmutableSet.builder(); + for (String host : hosts) { + LOG.info("Setting maintenance mode to {} for host {}", mode, host); + Optional<IHostAttributes> toSave = AttributeStore.Util.mergeMode(store, host, mode); + if (toSave.isPresent()) { + store.saveHostAttributes(toSave.get()); + LOG.info("Updated host attributes: " + toSave.get()); + statuses.add(IHostStatus.build(new HostStatus().setHost(host).setMode(mode))); + } + } + return statuses.build(); + } + + @VisibleForTesting + void runForTest() { + runOneIteration(); + } + + @Timed + @Override + protected void runOneIteration() { + LOG.info("Looking for hosts in DRAINING state"); + storage.read(store -> { + store.getAttributeStore() + .getHostAttributes() + .stream() + .filter(h -> h.getMode() == DRAINING) + .forEach(h -> { + if (drainTasksOnHost(h.getHost(), store).isEmpty()) { + storage.write(mutable -> setMaintenanceMode( + mutable, + ImmutableSet.of(h.getHost()), + DRAINED)); + } + }); + return null; + }); + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedDelaySchedule( + pollingInterval.getValue(), + pollingInterval.getValue(), + pollingInterval.getUnit().getTimeUnit()); + } + + private void drainTask(IScheduledTask task, StoreProvider store) { + String host = task.getAssignedTask().getSlaveHost(); + Optional<IHostMaintenanceRequest> hostMaintenanceRequest = + store.getHostMaintenanceStore().getHostMaintenanceRequest(host); + if (!hostMaintenanceRequest.isPresent()) { + LOG.error("No maintenance request found for host: {}. Assuming SLA not satisfied.", host); + missingMaintenanceCounter.incrementAndGet(); + return; + } + + boolean force = false; + long expireMs = + System.currentTimeMillis() - hostMaintenanceRequest.get().getCreatedTimestampMs(); + if (hostMaintenanceRequest.get().getTimeoutSecs() + < TimeAmount.of(expireMs, Time.MILLISECONDS).as(Time.SECONDS)) { + LOG.warn("Maintenance request timed out for host: {} after {} secs. Forcing drain of {}.", + host, hostMaintenanceRequest.get().getTimeoutSecs(), Tasks.id(task)); + force = true; + } + + final ISlaPolicy slaPolicy = task.getAssignedTask().getTask().isSetSlaPolicy() + ? task.getAssignedTask().getTask().getSlaPolicy() + : hostMaintenanceRequest.get().getDefaultSlaPolicy(); + + slaManager.checkSlaThenAct( + task, + slaPolicy, + storeProvider -> stateManager.changeState( + storeProvider, + Tasks.id(task), + Optional.empty(), + ScheduleStatus.DRAINING, + Optional.of(DRAINING_MESSAGE)), + force); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java new file mode 100644 index 0000000..f4f4ec3 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceModule.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.maintenance; + +import java.util.concurrent.ScheduledExecutorService; +import javax.inject.Singleton; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.config.validators.PositiveAmount; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Binding module for maintenance related logic. + */ +public class MaintenanceModule extends AbstractModule { + + private static final Logger LOG = LoggerFactory.getLogger(MaintenanceModule.class); + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-host_maintenance_polling_interval", + validateValueWith = PositiveAmount.class, + description = "Interval between polling for pending host maintenance requests.") + public TimeAmount hostMaintenancePollingInterval = new TimeAmount(1, Time.MINUTES); + } + + private final Options options; + + public MaintenanceModule(final Options options) { + this.options = options; + } + + @Override + protected void configure() { + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith( + MaintenanceController.MaintenanceControllerImpl.PollingInterval.class) + .toInstance(options.hostMaintenancePollingInterval); + bind(MaintenanceController.class).to(MaintenanceController.MaintenanceControllerImpl.class); + bind(MaintenanceController.MaintenanceControllerImpl.class).in(Singleton.class); + PubsubEventModule.bindSubscriber( + binder(), + MaintenanceController.MaintenanceControllerImpl.class); + + install(new PrivateModule() { + @Override + protected void configure() { + bind(ScheduledExecutorService.class).toInstance( + AsyncUtil.singleThreadLoggingScheduledExecutor("MaintenanceController-%d", LOG)); + } + }); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) + .to(MaintenanceController.MaintenanceControllerImpl.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java index 3b4df55..53b14e9 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -39,10 +39,10 @@ import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.maintenance.MaintenanceController; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.offers.OfferManagerModule; -import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java new file mode 100644 index 0000000..98bec48 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java @@ -0,0 +1,437 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.sla; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; +import javax.inject.Qualifier; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Striped; +import com.google.gson.Gson; +import com.google.inject.Inject; + +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.ICoordinatorSlaPolicy; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.IServerInfo; +import org.apache.aurora.scheduler.storage.entities.ISlaPolicy; +import org.apache.aurora.scheduler.storage.entities.ITaskEvent; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Param; +import org.asynchttpclient.Response; +import org.asynchttpclient.util.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +/** + * Provides methods for performing SLA-safe work. It is used for maintenance and job update + * operations to guarantee that a job's SLA requirements are always satisfied. + */ +public class SlaManager { + private static final Logger LOG = LoggerFactory.getLogger(SlaManager.class); + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface SlaManagerExecutor { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MaxParallelCoordinators { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface HttpClient { } + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface MinRequiredInstances { } + + @VisibleForTesting + static final String TASK_PARAM = "task"; + + private static final String ATTEMPTS_STAT_NAME = "sla_coordinator_attempts"; + private static final String SUCCESS_STAT_NAME = "sla_coordinator_success"; + private static final String ERRORS_STAT_NAME = "sla_coordinator_errors"; + private static final String USER_ERRORS_STAT_NAME = "sla_coordinator_user_errors"; + private static final String LOCK_STARVATION_STAT_NAME = "sla_coordinator_lock_starvation"; + + private final ScheduledExecutorService executor; + private final Storage storage; + private final IServerInfo serverInfo; + private final AsyncHttpClient httpClient; + private final Striped<Lock> lock; + private final int minRequiredInstances; + private final TierManager tierManager; + + private final AtomicLong attemptsCounter; + private final AtomicLong successCounter; + private final AtomicLong errorsCounter; + private final AtomicLong userErrorsCounter; + private final AtomicLong lockStarvationCounter; + private final LoadingCache<String, AtomicLong> errorsByTaskCounter; + private final LoadingCache<String, AtomicLong> userErrorsByTaskCounter; + private final LoadingCache<String, AtomicLong> lockStarvationByTaskCounter; + + @Inject + SlaManager(@SlaManagerExecutor ScheduledExecutorService executor, + @MaxParallelCoordinators Integer maxCoordinatorLocks, + @MinRequiredInstances Integer minRequiredInstances, + Storage storage, + IServerInfo serverInfo, + @HttpClient AsyncHttpClient httpClient, + TierManager tierManager, + StatsProvider statsProvider) { + + this.executor = requireNonNull(executor); + this.storage = requireNonNull(storage); + this.serverInfo = requireNonNull(serverInfo); + this.httpClient = requireNonNull(httpClient); + this.tierManager = requireNonNull(tierManager); + this.minRequiredInstances = requireNonNull(minRequiredInstances); + this.attemptsCounter = statsProvider.makeCounter(ATTEMPTS_STAT_NAME); + this.successCounter = statsProvider.makeCounter(SUCCESS_STAT_NAME); + this.errorsCounter = statsProvider.makeCounter(ERRORS_STAT_NAME); + this.userErrorsCounter = statsProvider.makeCounter(USER_ERRORS_STAT_NAME); + this.lockStarvationCounter = statsProvider.makeCounter(LOCK_STARVATION_STAT_NAME); + this.lock = Striped.lazyWeakLock(requireNonNull(maxCoordinatorLocks)); + this.errorsByTaskCounter = CacheBuilder.newBuilder().build( + new CacheLoader<String, AtomicLong>() { + @Override + public AtomicLong load(String key) { + return statsProvider.makeCounter(key); + } + } + ); + this.userErrorsByTaskCounter = CacheBuilder.newBuilder().build( + new CacheLoader<String, AtomicLong>() { + @Override + public AtomicLong load(String key) { + return statsProvider.makeCounter(key); + } + } + ); + this.lockStarvationByTaskCounter = CacheBuilder.newBuilder().build( + new CacheLoader<String, AtomicLong>() { + @Override + public AtomicLong load(String key) { + return statsProvider.makeCounter(key); + } + } + ); + } + + private long getSlaDuration(ISlaPolicy slaPolicy) { + if (slaPolicy.isSetPercentageSlaPolicy()) { + return slaPolicy.getPercentageSlaPolicy().getDurationSecs(); + } else if (slaPolicy.isSetCountSlaPolicy()) { + return slaPolicy.getCountSlaPolicy().getDurationSecs(); + } + + throw new IllegalArgumentException("Expected a percentage/count sla policy."); + } + + private boolean meetsSLAInstances(ISlaPolicy slaPolicy, long running, long total) { + if (slaPolicy.isSetPercentageSlaPolicy()) { + double percentageRunning = ((double) running / total) * 100.0; + double percentageRequired = slaPolicy.getPercentageSlaPolicy().getPercentage(); + long numMaintenance = (long) ((1 - percentageRequired / 100.0) * total); + if (numMaintenance < 1) { + double updated = ((total - 1) / (double) total) * 100; + LOG.warn("Invalid PercentageSlaPolicy(percentage={}) for task with {} instances " + + "that allows {} instances to be in maintenance." + + "Using percentage={} which allows 1 instance to be under maintenance.", + percentageRequired, total, numMaintenance, updated); + percentageRequired = updated; + } + return percentageRunning >= percentageRequired; + } else if (slaPolicy.isSetCountSlaPolicy()) { + long numRequired = slaPolicy.getCountSlaPolicy().getCount(); + long numMaintenance = total - numRequired; + if (numMaintenance < 1) { + long updated = total - 1; + LOG.warn("Invalid CountSlaPolicy(count={}) for task with {} instances " + + "that allows {} instances to be in maintenance." + + "Using count={} which allows 1 instance to be under maintenance.", + numRequired, total, numMaintenance, updated); + numRequired = updated; + } + return running >= numRequired; + } + + throw new IllegalArgumentException("Expected a percentage/count sla policy."); + } + + private boolean meetsSLADuration(IScheduledTask task, Amount<Long, Time> slaDuration) { + return task.getTaskEvents() + .stream() + .filter(te -> te.getStatus() == ScheduleStatus.RUNNING) + .map(ITaskEvent::getTimestamp) + .max(Long::compare) + .map(t -> System.currentTimeMillis() - t > slaDuration.as(Time.MILLISECONDS)) + .orElse(false); + } + + private boolean checkSla(IScheduledTask task, ISlaPolicy slaPolicy, StoreProvider store) { + // Find the number of active tasks for the job, it will be used as the set of tasks + // against which the percentage/count based SLA will be calculated against. + final long numActive = store.getTaskStore().fetchTasks( + Query.jobScoped(task.getAssignedTask().getTask().getJob()).active() + ).size(); + + if (skipSla(task, numActive)) { + LOG.info("Skip SLA for {} because it is not production or does not have enough instances.", + Tasks.id(task)); + return true; + } + + // Find tasks which have been RUNNING for the required SLA duration. + Amount<Long, Time> slaDuration = new TimeAmount(getSlaDuration(slaPolicy), Time.SECONDS); + final Set<IScheduledTask> running = store.getTaskStore().fetchTasks( + Query.jobScoped(task.getAssignedTask().getTask().getJob()) + .byStatus(ScheduleStatus.RUNNING)) + .stream() + .filter(t -> !Tasks.id(t).equals(Tasks.id(task))) // exclude the task to be removed + .filter(t -> meetsSLADuration(t, slaDuration)) // task is running for sla duration + .collect(Collectors.toSet()); + + // Check it we satisfy the number of RUNNING tasks per duration time. + boolean meetsSla = meetsSLAInstances(slaPolicy, running.size(), numActive); + + LOG.info("SlaCheck: {}, {} tasks unaffected after updating state for {}.", + meetsSla, + running.size(), + Tasks.id(task)); + + return meetsSla; + } + + /** + * Performs the supplied {@link Storage.MutateWork} after checking with the configured + * coordinator endpoint to make sure it is safe to perform the work. + * + * NOTE: Both the SLA check and the {@link Storage.MutateWork} will be performed within a + * {@link Lock} that is indexed by the coordinator url. We do this to make sure that mutations + * to the SLA are performed atomically, so the Coordinator does not have to track + * concurrent requests and simulate SLA changes. + * + * @param task Task whose SLA is to checked. + * @param slaPolicy {@link ICoordinatorSlaPolicy} to use for checking SLA. + * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied. + * @param <T> The type of result the {@link Storage.MutateWork} produces. + * @param <E> The type of exception the {@link Storage.MutateWork} throw. + */ + private <T, E extends Exception> void askCoordinatorThenAct( + IScheduledTask task, + ICoordinatorSlaPolicy slaPolicy, + Storage.MutateWork<T, E> work) { + + String taskKey = getTaskKey(task); + + LOG.debug("Awaiting lock on coordinator: {} for task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey); + Lock l = lock.get(slaPolicy.getCoordinatorUrl()); + if (l.tryLock()) { + try { + LOG.info("Acquired lock on coordinator: {} for task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey); + attemptsCounter.incrementAndGet(); + + if (coordinatorAllows(task, taskKey, slaPolicy)) { + LOG.info("Performing work after coordinator: {} approval for task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey); + storage.write(work); + } + } catch (RuntimeException e) { + LOG.error("Unexpected failure during coordinator sla check against: {} for task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey, + e); + throw e; + } catch (Exception e) { + incrementErrorCount(ERRORS_STAT_NAME, taskKey); + LOG.error("Failed to talk to coordinator: {} for task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey, + e); + } finally { + LOG.info("Releasing lock for coordinator: {} and task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey); + l.unlock(); + } + } else { + incrementErrorCount(LOCK_STARVATION_STAT_NAME, taskKey); + LOG.info("Failed to acquire lock on coordinator: {} for task: {}", + slaPolicy.getCoordinatorUrl(), + taskKey); + } + } + + private boolean coordinatorAllows( + IScheduledTask task, + String taskKey, + ICoordinatorSlaPolicy slaPolicy) + throws InterruptedException, ExecutionException, TException { + + LOG.info("Checking coordinator: {} for task: {}", slaPolicy.getCoordinatorUrl(), taskKey); + + Response response = httpClient.preparePost(slaPolicy.getCoordinatorUrl()) + .setQueryParams(ImmutableList.of(new Param(TASK_PARAM, taskKey))) + .setBody(new TSerializer(new TSimpleJSONProtocol.Factory()).toString(task.newBuilder())) + .execute() + .get(); + + if (response.getStatusCode() != HttpConstants.ResponseStatusCodes.OK_200) { + LOG.error("Request failed to coordinator: {} for task: {}. Response: {}", + slaPolicy.getCoordinatorUrl(), + taskKey, + response.getStatusCode()); + incrementErrorCount(USER_ERRORS_STAT_NAME, taskKey); + return false; + } + + successCounter.incrementAndGet(); + String json = response.getResponseBody(); + LOG.info("Got response: {} from {} for task: {}", + json, + slaPolicy.getCoordinatorUrl(), + taskKey); + + Map<String, Boolean> result = new Gson().fromJson( + json, + new TypeReference<Map<String, Boolean>>() { }.getType()); + + return result.get(slaPolicy.isSetStatusKey() ? slaPolicy.getStatusKey() : "drain"); + } + + @VisibleForTesting + String getTaskKey(IScheduledTask task) { + IJobKey jobKey = Tasks.getJob(task); + int instanceId = Tasks.getInstanceId(task); + return String.format("%s/%s/%s/%s/%s", serverInfo.getClusterName(), + jobKey.getRole(), jobKey.getEnvironment(), jobKey.getName(), instanceId); + } + + private void incrementErrorCount(String prefix, String taskKey) { + try { + if (prefix.equals(ERRORS_STAT_NAME)) { + errorsCounter.incrementAndGet(); + errorsByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet(); + } + + if (prefix.equals(USER_ERRORS_STAT_NAME)) { + userErrorsCounter.incrementAndGet(); + userErrorsByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet(); + } + + if (prefix.equals(LOCK_STARVATION_STAT_NAME)) { + lockStarvationCounter.incrementAndGet(); + lockStarvationByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet(); + } + } catch (ExecutionException e) { + LOG.error("Failed increment failure metrics for task: {}", taskKey, e); + } + } + + /** + * Checks the SLA for the given task using the {@link ISlaPolicy} supplied. + * + * @param task Task whose SLA is to be checked. + * @param slaPolicy {@link ISlaPolicy} to use. + * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied. + * @param force boolean to indicate if work should be performed without checking SLA. + * @param <T> The type of result the {@link Storage.MutateWork} produces. + * @param <E> The type of exception the {@link Storage.MutateWork} throw. + * @throws E raises exception of type E if the {@link Storage.MutateWork} throws. + */ + @Timed + public <T, E extends Exception> void checkSlaThenAct( + IScheduledTask task, + ISlaPolicy slaPolicy, + Storage.MutateWork<T, E> work, + boolean force) throws E { + + if (force) { + LOG.info("Forcing work without applying SlaPolicy: {} for {}", slaPolicy, Tasks.id(task)); + storage.write(work); + return; + } + + LOG.info("Using SlaPolicy: {} for {}", slaPolicy, Tasks.id(task)); + + // has custom coordinator sla policy + if (slaPolicy.isSetCoordinatorSlaPolicy()) { + // schedule work to perform coordinated transition + executor.execute(() -> askCoordinatorThenAct( + task, + slaPolicy.getCoordinatorSlaPolicy(), + work)); + } else { + // verify sla and perform work if satisfied + storage.write(store -> { + if (checkSla(task, slaPolicy, store)) { + work.apply(store); + } + return null; // TODO(sshanmugham) we need to satisfy the interface, refactor later + }); + } + } + + private boolean skipSla(IScheduledTask task, long numActive) { + if (!tierManager.getTier(task.getAssignedTask().getTask()).isPreemptible() + && !tierManager.getTier(task.getAssignedTask().getTask()).isRevocable()) { + return numActive < minRequiredInstances; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java index 25ed474..07082a9 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java @@ -18,7 +18,6 @@ import java.lang.annotation.Target; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import javax.inject.Inject; import javax.inject.Qualifier; @@ -30,6 +29,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.Singleton; +import com.google.inject.TypeLiteral; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; @@ -39,6 +39,9 @@ import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.config.validators.PositiveAmount; import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings; import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +54,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.JOB_UPTIMES; import static org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.MEDIANS; import static org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.PLATFORM_UPTIME; +import static org.asynchttpclient.Dsl.asyncHttpClient; /** * Binding module for the sla processor. @@ -76,6 +80,27 @@ public class SlaModule extends AbstractModule { description = "Metric categories collected for non production tasks.", splitter = CommaSplitter.class) public List<MetricCategory> slaNonProdMetrics = ImmutableList.of(); + + @Parameter(names = "-sla_coordinator_timeout", + validateValueWith = PositiveAmount.class, + description = "Timeout interval for communicating with Coordinator.") + public TimeAmount slaCoordinatorTimeout = new TimeAmount(1, Time.MINUTES); + + @Parameter(names = "-max_parallel_coordinated_maintenance", + description = "Maximum number of coordinators that can be contacted in parallel.") + public Integer maxParallelCoordinators = 10; + + @Parameter(names = "-min_required_instances_for_sla_check", + description = "Minimum number of instances required for a job to be eligible for SLA " + + "check. This does not apply to jobs that have a CoordinatorSlaPolicy.") + public Integer minRequiredInstances = 20; + + @Parameter(names = "-max_sla_duration_secs", + validateValueWith = PositiveAmount.class, + description = "Maximum duration window for which SLA requirements are to be satisfied." + + "This does not apply to jobs that have a CoordinatorSlaPolicy." + ) + public TimeAmount maxSlaDuration = new TimeAmount(2, Time.HOURS); } @VisibleForTesting @@ -104,6 +129,36 @@ public class SlaModule extends AbstractModule { bind(SlaUpdater.class).in(Singleton.class); SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SlaUpdater.class); + + DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder() + .setConnectTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue()) + .setHandshakeTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue()) + .setSslSessionTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue()) + .setReadTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue()) + .setRequestTimeout(options.slaCoordinatorTimeout.as(Time.MILLISECONDS).intValue()) + .setKeepAliveStrategy(new DefaultKeepAliveStrategy()) + .build(); + AsyncHttpClient httpClient = asyncHttpClient(config); + + bind(AsyncHttpClient.class) + .annotatedWith(SlaManager.HttpClient.class) + .toInstance(httpClient); + + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(SlaManager.MinRequiredInstances.class) + .toInstance(options.minRequiredInstances); + + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(SlaManager.MaxParallelCoordinators.class) + .toInstance(options.maxParallelCoordinators); + + bind(ScheduledExecutorService.class) + .annotatedWith(SlaManager.SlaManagerExecutor.class) + .toInstance(AsyncUtil.loggingScheduledExecutor( + options.maxParallelCoordinators, + "SlaManager-%d", LOG)); + + bind(SlaManager.class).in(javax.inject.Singleton.class); } // TODO(ksweeney): This should use AbstractScheduledService.