http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java new file mode 100644 index 0000000..759a1bc --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java @@ -0,0 +1,1239 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.sla; + +import java.io.IOException; +import java.net.URLEncoder; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.CoordinatorSlaPolicy; +import org.apache.aurora.gen.CountSlaPolicy; +import org.apache.aurora.gen.PercentageSlaPolicy; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.ServerInfo; +import org.apache.aurora.gen.SlaPolicy; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.IServerInfo; +import org.apache.aurora.scheduler.storage.entities.ISlaPolicy; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +public class SlaManagerTest extends EasyMockTest { + + private static final Logger LOG = LoggerFactory.getLogger(SlaManagerTest.class); + private static final String CLUSTER_NAME = "test_cluster"; + private static final String STATS_URL_PREFIX = "fake_url"; + private static final String HOST_A = "a"; + + private static final ISlaPolicy PERCENTAGE_SLA_POLICY = ISlaPolicy.build( + SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(66) + .setDurationSecs(1800))); + + private static final ISlaPolicy COUNT_SLA_POLICY = ISlaPolicy.build( + SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(2) + .setDurationSecs(1800))); + + private AsyncHttpClient httpClient; + private SlaManager slaManager; + private StorageTestUtil storageUtil; + private StateManager stateManager; + private IServerInfo serverInfo; + private Server jettyServer; + private CountDownLatch coordinatorResponded; + + @Before + public void setUp() { + jettyServer = new Server(0); // Start Jetty server with ephemeral port + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + stateManager = createMock(StateManager.class); + httpClient = new DefaultAsyncHttpClient(); + coordinatorResponded = new CountDownLatch(1); + + serverInfo = IServerInfo.build( + new ServerInfo() + .setClusterName(CLUSTER_NAME) + .setStatsUrlPrefix(STATS_URL_PREFIX)); + + Injector injector = Guice.createInjector( + new AbstractModule() { + @Override + protected void configure() { + bind(Storage.class).toInstance(storageUtil.storage); + bind(StateManager.class).toInstance(stateManager); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(TierManager.class).toInstance(TIER_MANAGER); + bind(AsyncHttpClient.class) + .annotatedWith(SlaManager.HttpClient.class) + .toInstance(httpClient); + + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(SlaManager.MinRequiredInstances.class) + .toInstance(2); + + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(SlaManager.MaxParallelCoordinators.class) + .toInstance(10); + + bind(ScheduledExecutorService.class) + .annotatedWith(SlaManager.SlaManagerExecutor.class) + .toInstance(AsyncUtil.loggingScheduledExecutor( + 10, "SlaManagerTest-%d", LOG)); + + bind(IServerInfo.class).toInstance(serverInfo); + } + } + ); + slaManager = injector.getInstance(SlaManager.class); + + addTearDown(() -> jettyServer.stop()); + } + + private static IScheduledTask makeTask( + String taskId, + int instanceId, + ScheduleStatus status) { + return makeTask(taskId, instanceId, status, 1000, true); + } + + private static IScheduledTask makeTask( + String taskId, + int instanceId, + long runningSince) { + return makeTask(taskId, instanceId, RUNNING, runningSince, true); + } + + private static IScheduledTask makeTask( + String taskId, + int instanceId, + ScheduleStatus status, + long runningSince, + boolean prod) { + ScheduledTask builder = TaskTestUtil.addStateTransition( + TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB, instanceId, prod), + status, + runningSince).newBuilder(); + builder.getAssignedTask().setSlaveHost(SlaManagerTest.HOST_A); + return IScheduledTask.build(builder); + } + + /** + * Verifies that SLA check passes and the supplied {@link Storage.MutateWork} gets executed + * for a job that has {@link CountSlaPolicy#count} + 1 tasks that have been RUNNING for + * the required {@link CountSlaPolicy#durationSecs} with {@link CountSlaPolicy#count} set to 2. + */ + @Test + public void testCheckCountSlaPassesThenActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, RUNNING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + COUNT_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check gets overridden, passes and the supplied {@link Storage.MutateWork} is + * executed for a job that has and aggressive {@link CountSlaPolicy#count} > total number of + * instances. + */ + @Test + public void testCheckCountSlaOverridesAggressiveCountPassesThenActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, RUNNING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(3) + .setDurationSecs(1800))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check gets overridden, fails and the supplied {@link Storage.MutateWork} is + * not executed for a job that has and aggressive {@link CountSlaPolicy#count} > total number of + * instances. + */ + @Test + public void testCheckCountSlaOverridesAggressiveCountFailsAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, PENDING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(3) + .setDurationSecs(1800))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} does not get + * executed for a job that has {@link CountSlaPolicy#count} tasks that have been RUNNING for + * the required {@link CountSlaPolicy#durationSecs} and 1 task in PENDING with + * {@link CountSlaPolicy#count} set to 2. + */ + @Test + public void testCheckCountSlaFailsDoesNotAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, PENDING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + COUNT_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check is skipped and the supplied {@link Storage.MutateWork} gets + * executed for a job that does not meet {@link SlaManager#minRequiredInstances}. + */ + @Test + public void testCheckCountSlaFailsDueToMinRequiredInstancesActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(0) + .setDurationSecs(0))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check is skipped and the supplied {@link Storage.MutateWork} gets + * executed for a job is not the correct tier. + */ + @Test + public void testCheckCountSlaFailsDueToTierActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING, 1000, false); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.countSlaPolicy( + new CountSlaPolicy() + .setCount(0) + .setDurationSecs(0))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} does not get + * executed for a job that has {@link CountSlaPolicy#count} tasks that have been RUNNING for + * the required {@link CountSlaPolicy#durationSecs} and 1 task in RUNNING for less than + * {@link CountSlaPolicy#durationSecs} with {@link CountSlaPolicy#count} set to 2. + */ + @Test + public void testCheckCountSlaFailsDueToRunningTimeDoesNotAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, System.currentTimeMillis()); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + COUNT_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that when {@code force} is {@code True} supplied {@link Storage.MutateWork} gets + * executed for a job that has 1 task in RUNNING for less than {@link CountSlaPolicy#durationSecs} + * with {@link CountSlaPolicy#count} set to 2, without checking SLA. + */ + @Test + public void testCheckCountSlaCheckForceAct() { + IScheduledTask task1 = makeTask("taskA", 1, System.currentTimeMillis()); + + // expect that the fetchTask is the work is called after force + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + COUNT_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + true); + } + + /** + * Verifies that SLA check passes and the supplied {@link Storage.MutateWork} gets executed + * for a job that has more {@link PercentageSlaPolicy#percentage} tasks that have been RUNNING for + * the required {@link PercentageSlaPolicy#durationSecs} with + * {@link PercentageSlaPolicy#percentage} set to 66. + */ + @Test + public void testCheckPercentageSlaPassesThenActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, RUNNING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + PERCENTAGE_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check overridden and the supplied {@link Storage.MutateWork} gets executed + * for a job that requires aggressive {@link PercentageSlaPolicy#percentage} tasks to be RUNNING + * for the required {@link PercentageSlaPolicy#durationSecs} with + * {@link PercentageSlaPolicy#percentage} set to 67. + */ + @Test + public void testCheckPercentageSlaWithAggressivePercentagePassesThenActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, RUNNING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + // We set the required running percentage to 67 which will allow 0 instances to be in + // maintenance is overridden to allow exactly 1 instance to be in maintenance. + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(67) + .setDurationSecs(1800))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check is overridden and the supplied {@link Storage.MutateWork} gets + * executed for a job that requires aggressive {@link PercentageSlaPolicy#percentage} tasks to + * be RUNNING for the required {@link PercentageSlaPolicy#durationSecs} with + * {@link PercentageSlaPolicy#percentage} set to 67. + */ + @Test + public void testCheckPercentageSlaWithAggressivePercentageFailsAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, PENDING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + control.replay(); + + // We set the required running percentage to 67 which will allow 0 instances to be in + // maintenance is not overridden to allow 1 instance to be in maintenance. + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(67) + .setDurationSecs(1800))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} does not get + * executed for a job that has {@link PercentageSlaPolicy#percentage} tasks that have been + * RUNNING for the required {@link PercentageSlaPolicy#durationSecs} and 1 task in PENDING with + * {@link PercentageSlaPolicy#percentage} set to 66. + */ + @Test + public void testCheckPercentageSlaFailsDoesNotAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, PENDING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + PERCENTAGE_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check is skipped and the supplied {@link Storage.MutateWork} gets + * executed for a job that does not meet {@link SlaManager#minRequiredInstances}. + */ + @Test + public void testCheckPercentageSlaFailsDueToMinRequiredInstancesActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(0) + .setDurationSecs(0))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check is skipped and the supplied {@link Storage.MutateWork} gets + * executed for a job is not correct tier. + */ + @Test + public void testCheckPercentageSlaFailsDueToTierActs() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING, 1000, false); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1)); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build( + SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy() + .setPercentage(0) + .setDurationSecs(0))), + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} does not get + * executed for a job that has {@link PercentageSlaPolicy#percentage} tasks that have been + * RUNNING for the required {@link PercentageSlaPolicy#durationSecs} and 1 task in RUNNING for + * less than {@link PercentageSlaPolicy#durationSecs} with {@link PercentageSlaPolicy#percentage} + * set to 66. + */ + @Test + public void testCheckPercentageSlaFailsDueToRunningTimeDoesNotAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + IScheduledTask task3 = makeTask("taskC", 3, System.currentTimeMillis()); + + // mock calls to fetch all active tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).active())) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + // mock calls to fetch all RUNNING tasks for the job for sla calculation + expect(storageUtil.taskStore.fetchTasks(Query.jobScoped(Tasks.getJob(task1)).byStatus(RUNNING))) + .andReturn(ImmutableSet.of(task1, task2, task3)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + PERCENTAGE_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + false); + } + + /** + * Verifies that when {@code force} is {@code True} the supplied {@link Storage.MutateWork} gets + * executed for a job that has 1 task in RUNNING for less than + * {@link PercentageSlaPolicy#durationSecs} + * with {@link PercentageSlaPolicy#percentage} set to 66, without checking SLA. + */ + @Test + public void testCheckPercentageSlaCheckForceAct() { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + + // expect that the fetchTask is the work is called after force + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + PERCENTAGE_SLA_POLICY, + storeProvider -> storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()), + true); + } + + /** + * Verifies that SLA check passes and the supplied {@link Storage.MutateWork} gets executed + * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds + * with {@code {"drain": true}}. + */ + @Test + public void testCheckCoordinatorSlaPassesThenActs() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + CountDownLatch workCalled = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": true}")); + jettyServer.start(); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) { + slaManager.checkSlaThenAct( + task1, + createCoordinatorSlaPolicy(), + storeProvider -> { + // set the marker to indicate that we performed the work + workCalled.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + return null; + }, + false); + } + + // wait until we are sure that the server has responded + coordinatorResponded.await(); + workCalled.await(); + + assertEquals(0, coordinatorResponded.getCount()); + // check the work was called + assertEquals(0, workCalled.getCount()); + } + + /** + * Verifies that SLA check passes and the supplied {@link Storage.MutateWork} gets executed + * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds + * with {@code {"drain": true}}. + */ + @Test + public void testCheckCoordinatorSlaPassesThenActsCustomStatusKey() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + CountDownLatch workCalled = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"custom-key\": true}")); + jettyServer.start(); + + // expect that the fetchTask in the work is called, after sla check passes + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) { + slaManager.checkSlaThenAct( + task1, + createCoordinatorSlaPolicy("custom-key"), + storeProvider -> { + // set the marker to indicate that we performed the work + workCalled.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + return null; + }, + false); + } + + workCalled.await(); + + assertEquals(0, coordinatorResponded.getCount()); + // check the work was called + assertEquals(0, workCalled.getCount()); + } + + /** + * Verifies that SLA check passes and the supplied {@link Storage.MutateWork} does not get + * executed for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds + * with {@code {"drain": false}}. + */ + @Test + public void testCheckCoordinatorSlaFailsDoesNotAct() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + CountDownLatch workCalled = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": false}")); + jettyServer.start(); + + control.replay(); + + while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) { + slaManager.checkSlaThenAct( + task1, + createCoordinatorSlaPolicy(), + storeProvider -> { + // set the marker to indicate that we performed the work + workCalled.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + return null; + }, + false); + } + + try { + workCalled.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // expected + } + + assertEquals(0, coordinatorResponded.getCount()); + // check the work was not called + assertEquals(1, workCalled.getCount()); + } + + /** + * Verifies that when {@code force} is {@code True} the supplied {@link Storage.MutateWork} gets + * executed without checking the SLA. + */ + @Test + public void testCheckCoordinatorSlaCheckForceAct() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + CountDownLatch workCalled = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": false}")); + jettyServer.start(); + + // expect that the fetchTask in the work is called after force + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + createCoordinatorSlaPolicy(), + storeProvider -> { + // set the marker to indicate that we performed the work + workCalled.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + return null; + }, + true); + + workCalled.await(); + + // coordinator is not contacted + assertEquals(1, coordinatorResponded.getCount()); + // check the work was called + assertEquals(0, workCalled.getCount()); + } + + /** + * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} does not get + * executed for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds + * with error. + */ + @Test + public void testCheckCoordinatorSlaErrorDoesNotAct() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + CountDownLatch workCalled = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorError()); + jettyServer.start(); + + control.replay(); + + while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) { + slaManager.checkSlaThenAct( + task1, + createCoordinatorSlaPolicy(), + storeProvider -> { + // set the marker to indicate that we performed the work + workCalled.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + return null; + }, + false); + } + + try { + workCalled.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // expected + } + + assertEquals(0, coordinatorResponded.getCount()); + // check the work was not called + assertEquals(1, workCalled.getCount()); + } + + /** + * Verifies that SLA check fails and the supplied {@link Storage.MutateWork} does not get + * executed for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} throws. + */ + @Test + public void testCheckCoordinatorSlaThrowsDoesNotAct() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + CountDownLatch workCalled = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorError()); + jettyServer.start(); + + control.replay(); + + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy( + new CoordinatorSlaPolicy() + .setCoordinatorUrl("http://does.not.exist.com"))), + storeProvider -> { + // set the marker to indicate that we performed the work + workCalled.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + return null; + }, + false); + + try { + workCalled.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // expected + } + + // check the work was not called + assertEquals(1, workCalled.getCount()); + } + + /** + * Start 2 actions simultaneously and make the first one to enter the coordinator lock to block + * on a semaphore. Only when the semaphore is released will the next action be able acquire the + * lock. At the end both actions should have completed. + */ + @Test + public void testCheckCoordinatorSlaNoLockDoesNotAct() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + + CountDownLatch blocked1 = new CountDownLatch(1); + CountDownLatch blocked2 = new CountDownLatch(1); + CountDownLatch action1 = new CountDownLatch(1); + CountDownLatch action2 = new CountDownLatch(1); + CountDownLatch finishAction1 = new CountDownLatch(1); + CountDownLatch finishAction2 = new CountDownLatch(1); + + jettyServer.setHandler(mockCoordinatorResponses("{\"drain\": true}")); + jettyServer.start(); + + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + expect(storageUtil.taskStore.fetchTask(task2.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task2)); + + control.replay(); + + // kick off a new thread to attempt starting action1 + new Thread(() -> { + try { + // wait until action1 enters the lock + while (!action1.await(100, TimeUnit.MILLISECONDS)) { + // start action1 + slaManager.checkSlaThenAct( + task1, + createCoordinatorSlaPolicy(), + storeProvider -> { + LOG.info("Starting action1 for task:{}", slaManager.getTaskKey(task1)); + // set the marker to indicate that we started the work + action1.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + // we will block here to make sure that the lock is held + blocked1.await(); + finishAction1.countDown(); + LOG.info("Finished action1 for task:{}", slaManager.getTaskKey(task1)); + return null; + }, + false); + } + } catch (InterruptedException e) { + fail(); + } + }).start(); + + // wait for action to ge the lock + action1.await(); + + // kick off a new thread to attempt starting action2 + new Thread(() -> { + try { + // wait until action2 enters the lock + while (!action2.await(100, TimeUnit.MILLISECONDS)) { + // start action2 + slaManager.checkSlaThenAct( + task2, + createCoordinatorSlaPolicy(), + storeProvider -> { + LOG.info("Starting action2 for task:{}", slaManager.getTaskKey(task2)); + // set the marker to indicate that we started the work + action2.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task2.getAssignedTask().getTaskId()); + // we will block here to make sure that the lock is held + blocked2.await(); + finishAction2.countDown(); + LOG.info("Finished action2 for task:{}", slaManager.getTaskKey(task2)); + return null; + }, + false); + } + } catch (InterruptedException e) { + fail(); + } + }).start(); + + // both actions should not have entered critical section + assertNotEquals(action1.getCount(), action2.getCount()); + // action1 should have entered the lock + assertEquals(0, action1.getCount()); + // action2 should not have entered the lock + assertEquals(1, action2.getCount()); + + // unblock blocked action1 + blocked1.countDown(); + // wait for both of the actions to finish + finishAction1.await(); + + // wait for the second action to enter the lock + action2.await(); + + // action2 should have entered the lock + assertEquals(0, action2.getCount()); + + // unblock blocked action2 + blocked2.countDown(); + + finishAction2.await(); + } + + /** + * Start 2 actions simultaneously with different coordinator urls, so they can both enter the + * appropriate coordinator locks and try to acquire the semaphore. Both the actions must be able + * to enter the critical area and start the work simultaneously. + * At the end only one action would have fully completed the work, since the semaphore is not + * released. + */ + @Test + public void testCheckCoordinatorSlaParallelAct() throws Exception { + IScheduledTask task1 = makeTask("taskA", 1, RUNNING); + IScheduledTask task2 = makeTask("taskB", 2, RUNNING); + + Semaphore blocked = new Semaphore(1); + CountDownLatch action1 = new CountDownLatch(1); + CountDownLatch action2 = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(2); + + jettyServer.setHandler(mockCoordinatorResponses("{\"drain\": true}")); + jettyServer.start(); + + expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task1)); + + expect(storageUtil.taskStore.fetchTask(task2.getAssignedTask().getTaskId())) + .andReturn(Optional.of(task2)); + + control.replay(); + + // kick off a new thread to attempt starting action1 + new Thread(() -> { + try { + // wait until action1 enters the lock + while (!action1.await(100, TimeUnit.MILLISECONDS)) { + // start action1 + slaManager.checkSlaThenAct( + task1, + ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy( + new CoordinatorSlaPolicy() + .setCoordinatorUrl( + // Note that the url is different although referring to the same server + String.format("http://localhost:%d", jettyServer.getURI().getPort())))), + storeProvider -> { + LOG.info("Starting action for task:{}", slaManager.getTaskKey(task1)); + // set the marker to indicate that we started the work + action1.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task1.getAssignedTask().getTaskId()); + // we will block here to make sure that the lock is held + blocked.acquire(); + finished.countDown(); + LOG.info("Finished action for task:{}", slaManager.getTaskKey(task1)); + return null; + }, + false); + } + } catch (InterruptedException e) { + fail(); + } + }).start(); + + // kick off a new thread to attempt starting action2 + new Thread(() -> { + try { + // wait until action2 enters the lock + while (!action2.await(100, TimeUnit.MILLISECONDS)) { + // start action2 + slaManager.checkSlaThenAct( + task2, + ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy( + new CoordinatorSlaPolicy() + .setCoordinatorUrl( + // Note that the url is different although referring to the same server + String.format("http://0.0.0.0:%d", jettyServer.getURI().getPort())))), + storeProvider -> { + LOG.info("Starting action for task:{}", slaManager.getTaskKey(task2)); + // set the marker to indicate that we started the work + action2.countDown(); + storeProvider + .getUnsafeTaskStore() + .fetchTask(task2.getAssignedTask().getTaskId()); + // we will block here to make sure that the lock is held + blocked.acquire(); + finished.countDown(); + LOG.info("Finished action for task:{}", slaManager.getTaskKey(task2)); + return null; + }, + false); + } + } catch (InterruptedException e) { + fail(); + } + }).start(); + + // wait for both of the actions to get the lock + action1.await(); + action2.await(); + + assertEquals(0, action1.getCount()); + assertEquals(0, action2.getCount()); + + // only 1 action has fully completed + assertEquals(1, finished.getCount()); + + // unblock the blocked action + blocked.release(); + + finished.await(); + + // both actions have fully completed + assertEquals(0, finished.getCount()); + } + + private ISlaPolicy createCoordinatorSlaPolicy() { + return createCoordinatorSlaPolicy("drain"); + } + + private ISlaPolicy createCoordinatorSlaPolicy(String statusKey) { + return ISlaPolicy.build(SlaPolicy.coordinatorSlaPolicy( + new CoordinatorSlaPolicy() + .setCoordinatorUrl(String.format("http://localhost:%d", jettyServer.getURI().getPort())) + .setStatusKey(statusKey) + )); + } + + private AbstractHandler mockCoordinatorResponse( + IScheduledTask task, + String pollResponse) { + + return new AbstractHandler() { + @Override + public void handle( + String target, + Request baseRequest, + HttpServletRequest request, + HttpServletResponse response) throws IOException { + try { + String taskKey = slaManager.getTaskKey(task); + String query = Joiner + .on("=") + .join(SlaManager.TASK_PARAM, URLEncoder.encode(taskKey, "UTF-8")); + String body = new TSerializer(new TSimpleJSONProtocol.Factory()) + .toString(task.newBuilder()); + if (request.getQueryString().equals(query) + && request.getReader().lines().collect(Collectors.joining()).equals(body)) { + createResponse(baseRequest, response, pollResponse); + } + coordinatorResponded.countDown(); + } catch (TException e) { + fail(); + } + } + }; + } + + private AbstractHandler mockCoordinatorResponses(String mockResponse) { + return new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException { + createResponse(baseRequest, response, mockResponse); + } + }; + } + + private void createResponse( + Request baseRequest, + HttpServletResponse response, + String mockResponse) throws IOException { + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().write(mockResponse); + response.getWriter().flush(); + baseRequest.setHandled(true); + } + + private AbstractHandler mockCoordinatorError() { + return new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) { + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + baseRequest.setHandled(true); + coordinatorResponded.countDown(); + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java index d37e7a0..b38a1e3 100644 --- a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java @@ -30,17 +30,21 @@ import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.Clock; import org.apache.aurora.common.util.testing.FakeClock; +import org.apache.aurora.gen.ServerInfo; +import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.app.LifecycleModule; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.sla.SlaModule.SlaUpdater; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IServerInfo; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER; import static org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.JOB_UPTIMES; import static org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.MEDIANS; import static org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory.PLATFORM_UPTIME; @@ -50,6 +54,9 @@ import static org.junit.Assert.assertNotNull; public class SlaModuleTest extends EasyMockTest { + private static final String CLUSTER_NAME = "test_cluster"; + private static final String STATS_URL_PREFIX = "fake_url"; + private Injector injector; private FakeClock clock; private StorageTestUtil storageUtil; @@ -57,7 +64,7 @@ public class SlaModuleTest extends EasyMockTest { private SlaModule module; @Before - public void setUp() throws Exception { + public void setUp() { storageUtil = new StorageTestUtil(this); clock = new FakeClock(); statsProvider = createMock(StatsProvider.class); @@ -76,6 +83,12 @@ public class SlaModuleTest extends EasyMockTest { bind(Clock.class).toInstance(clock); bind(Storage.class).toInstance(storageUtil.storage); bind(StatsProvider.class).toInstance(statsProvider); + bind(TierManager.class).toInstance(TIER_MANAGER); + bind(IServerInfo.class).toInstance( + IServerInfo.build( + new ServerInfo() + .setClusterName(CLUSTER_NAME) + .setStatsUrlPrefix(STATS_URL_PREFIX))); } }).build() ); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java deleted file mode 100644 index 770846e..0000000 --- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Executor; - -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.HostStatus; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; -import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskTestUtil; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IHostStatus; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.v1.Protos; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.MaintenanceMode.DRAINED; -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl; -import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class MaintenanceControllerImplTest extends EasyMockTest { - - private static final String HOST_A = "a"; - private static final Set<String> A = ImmutableSet.of(HOST_A); - private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder() - .setValue("offer-id") - .build(); - private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder() - .setValue("agent-id") - .build(); - private static final Protos.FrameworkID FRAMEWORK_ID = Protos.FrameworkID.newBuilder() - .setValue("framework-id") - .build(); - private static final Protos.URL AGENT_URL = Protos.URL.newBuilder() - .setAddress(Protos.Address.newBuilder() - .setHostname(HOST_A) - .setPort(5051)) - .setScheme("http") - .build(); - private static final Protos.Unavailability UNAVAILABILITY = Protos.Unavailability.newBuilder() - .setStart(Protos.TimeInfo.newBuilder() - .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS))) - .build(); - - private static final Protos.InverseOffer INVERSE_OFFER = Protos.InverseOffer.newBuilder() - .setId(OFFER_ID) - .setAgentId(AGENT_ID) - .setUrl(AGENT_URL) - .setFrameworkId(FRAMEWORK_ID) - .setUnavailability(UNAVAILABILITY) - .build(); - - private StorageTestUtil storageUtil; - private StateManager stateManager; - private MaintenanceController maintenance; - private EventSink eventSink; - - @Before - public void setUp() throws Exception { - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - stateManager = createMock(StateManager.class); - TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); - expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes(); - - Injector injector = Guice.createInjector( - new PubsubEventModule(), - new AbstractModule() { - @Override - protected void configure() { - StateModule.bindMaintenanceController(binder()); - bind(Storage.class).toInstance(storageUtil.storage); - bind(StateManager.class).toInstance(stateManager); - bind(StatsProvider.class).toInstance(new FakeStatsProvider()); - bind(Executor.class).annotatedWith(AsyncExecutor.class) - .toInstance(MoreExecutors.directExecutor()); - bind(TaskEventBatchWorker.class).toInstance(batchWorker); - } - }); - maintenance = injector.getInstance(MaintenanceController.class); - eventSink = PubsubTestUtil.startPubsub(injector); - } - - private static IScheduledTask makeTask(String host, String taskId) { - ScheduledTask builder = TaskTestUtil.addStateTransition( - TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB), - RUNNING, - 1000).newBuilder(); - builder.getAssignedTask().setSlaveHost(host); - return IScheduledTask.build(builder); - } - - @Test - public void testMaintenanceCycle() { - IScheduledTask task1 = makeTask(HOST_A, "taskA"); - IScheduledTask task2 = makeTask(HOST_A, "taskB"); - - expectMaintenanceModeChange(HOST_A, SCHEDULED); - expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); - expectTaskDraining(task1); - expectTaskDraining(task2); - expectMaintenanceModeChange(HOST_A, DRAINING); - IHostAttributes attributes = - IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); - - expect(storageUtil.attributeStore.getHostAttributes(HOST_A)) - .andReturn(Optional.of(attributes)).times(2); - - expect(storageUtil.attributeStore.getHostAttributes()).andReturn(ImmutableSet.of(attributes)); - expectFetchTasksByHost(HOST_A, ImmutableSet.of(task2)); - // TaskA is KILLED and therefore no longer active - expectFetchTasksByHost(HOST_A, ImmutableSet.of()); - expectMaintenanceModeChange(HOST_A, DRAINED); - expectMaintenanceModeChange(HOST_A, NONE); - - control.replay(); - - assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); - assertStatus(HOST_A, DRAINING, maintenance.drain(A)); - assertStatus(HOST_A, DRAINING, maintenance.getStatus(A)); - eventSink.post( - TaskStateChange.transition( - IScheduledTask.build(task1.newBuilder().setStatus(KILLED)), RUNNING)); - eventSink.post( - TaskStateChange.transition( - IScheduledTask.build(task2.newBuilder().setStatus(KILLED)), RUNNING)); - assertStatus(HOST_A, NONE, maintenance.endMaintenance(A)); - } - - @Test - public void testUnknownHost() { - expect(storageUtil.attributeStore.getHostAttributes("b")) - .andReturn(Optional.empty()); - - control.replay(); - - assertEquals(ImmutableSet.of(), - maintenance.startMaintenance(ImmutableSet.of("b"))); - } - - @Test - public void testDrainEmptyHost() { - expectMaintenanceModeChange(HOST_A, SCHEDULED); - expectFetchTasksByHost(HOST_A, ImmutableSet.of()); - expectMaintenanceModeChange(HOST_A, DRAINED); - - control.replay(); - - assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); - assertStatus(HOST_A, DRAINED, maintenance.drain(A)); - } - - @Test - public void testEndEarly() { - expectMaintenanceModeChange(HOST_A, SCHEDULED); - expectMaintenanceModeChange(HOST_A, NONE); - expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of( - IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(NONE)))); - - control.replay(); - - assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); - - // End maintenance without DRAINING. - assertStatus(HOST_A, NONE, maintenance.endMaintenance(A)); - - // Make sure a later transition on the host does not cause any ill effects that could surface - // from stale internal state. - eventSink.post(TaskStateChange.transition( - IScheduledTask.build(makeTask(HOST_A, "taskA").newBuilder().setStatus(KILLED)), RUNNING)); - } - - @Test - public void testGetMode() { - expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of( - IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)))); - expect(storageUtil.attributeStore.getHostAttributes("unknown")).andReturn(Optional.empty()); - - control.replay(); - - assertEquals(DRAINING, maintenance.getMode(HOST_A)); - assertEquals(NONE, maintenance.getMode("unknown")); - } - - @Test - public void testInverseOfferDrain() { - IScheduledTask task1 = makeTask(HOST_A, "taskA"); - expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1)); - expectTaskDraining(task1); - - control.replay(); - maintenance.drainForInverseOffer(INVERSE_OFFER); - } - - private void expectTaskDraining(IScheduledTask task) { - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - Tasks.id(task), - Optional.empty(), - ScheduleStatus.DRAINING, - MaintenanceControllerImpl.DRAINING_MESSAGE)) - .andReturn(StateChangeResult.SUCCESS); - } - - private void expectFetchTasksByHost(String hostName, Set<IScheduledTask> tasks) { - expect(storageUtil.taskStore.fetchTasks(Query.slaveScoped(hostName).active())).andReturn(tasks); - } - - private void expectMaintenanceModeChange(String hostName, MaintenanceMode mode) { - IHostAttributes attributes = IHostAttributes.build(new HostAttributes().setHost(hostName)); - - expect(storageUtil.attributeStore.getHostAttributes(hostName)) - .andReturn(Optional.of(attributes)); - IHostAttributes updated = IHostAttributes.build(attributes.newBuilder().setMode(mode)); - expect(storageUtil.attributeStore.saveHostAttributes(updated)).andReturn(true); - } - - private void assertStatus(String host, MaintenanceMode mode, Set<IHostStatus> statuses) { - assertEquals(ImmutableSet.of(IHostStatus.build(new HostStatus(host, mode))), statuses); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java index 2cf66d8..0fc3673 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java @@ -50,6 +50,7 @@ import org.apache.aurora.gen.ListBackupsResult; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.MesosContainer; import org.apache.aurora.gen.Metadata; +import org.apache.aurora.gen.PercentageSlaPolicy; import org.apache.aurora.gen.PulseJobUpdateResult; import org.apache.aurora.gen.QueryRecoveryResult; import org.apache.aurora.gen.Range; @@ -62,6 +63,7 @@ import org.apache.aurora.gen.ResponseDetail; import org.apache.aurora.gen.Result; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.SlaPolicy; import org.apache.aurora.gen.StartJobUpdateResult; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskConstraint; @@ -77,10 +79,10 @@ import org.apache.aurora.scheduler.configuration.SanitizedConfiguration; import org.apache.aurora.scheduler.cron.CronException; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.cron.SanitizedCronJob; +import org.apache.aurora.scheduler.maintenance.MaintenanceController; import org.apache.aurora.scheduler.quota.QuotaCheckResult; import org.apache.aurora.scheduler.quota.QuotaManager; import org.apache.aurora.scheduler.reconciliation.TaskReconciler; -import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.UUIDGenerator; @@ -153,6 +155,7 @@ import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NOOP_J import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.PRUNE_TASKS; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.RESTART_SHARDS; +import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.SLA_DRAIN_HOSTS; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.START_JOB_UPDATE; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.START_MAINTENANCE; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.jobAlreadyExistsMessage; @@ -1097,6 +1100,59 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { assertEquals(1L, statsProvider.getLongValue(END_MAINTENANCE)); } + @Test + public void testSLAHostMaintenance() throws Exception { + Set<String> hostnames = ImmutableSet.of("a"); + SlaPolicy defaultSlaPolicy = SlaPolicy.percentageSlaPolicy( + new PercentageSlaPolicy().setPercentage(95)); + Set<IHostStatus> none = status("a", NONE); + Set<IHostStatus> scheduled = status("a", SCHEDULED); + Set<IHostStatus> draining = status("a", DRAINING); + Set<IHostStatus> drained = status("a", DRAINING); + expect(maintenance.getStatus(hostnames)).andReturn(none); + expect(maintenance.startMaintenance(hostnames)).andReturn(scheduled); + expect(maintenance.slaDrain(hostnames, defaultSlaPolicy, 10)).andReturn(draining); + expect(maintenance.getStatus(hostnames)).andReturn(draining); + expect(maintenance.getStatus(hostnames)).andReturn(drained); + expect(maintenance.endMaintenance(hostnames)).andReturn(none); + + control.replay(); + + Hosts hosts = new Hosts(hostnames); + + assertEquals( + IHostStatus.toBuildersSet(none), + thrift.maintenanceStatus(hosts).getResult().getMaintenanceStatusResult() + .getStatuses()); + assertEquals(1L, statsProvider.getLongValue(MAINTENANCE_STATUS)); + assertEquals( + IHostStatus.toBuildersSet(scheduled), + thrift.startMaintenance(hosts).getResult().getStartMaintenanceResult() + .getStatuses()); + assertEquals(1L, statsProvider.getLongValue(START_MAINTENANCE)); + assertEquals( + IHostStatus.toBuildersSet(draining), + thrift.slaDrainHosts(hosts, defaultSlaPolicy, 10) + .getResult() + .getDrainHostsResult() + .getStatuses()); + assertEquals(1L, statsProvider.getLongValue(SLA_DRAIN_HOSTS)); + assertEquals( + IHostStatus.toBuildersSet(draining), + thrift.maintenanceStatus(hosts).getResult().getMaintenanceStatusResult() + .getStatuses()); + assertEquals(2L, statsProvider.getLongValue(MAINTENANCE_STATUS)); + assertEquals( + IHostStatus.toBuildersSet(drained), + thrift.maintenanceStatus(hosts).getResult().getMaintenanceStatusResult() + .getStatuses()); + assertEquals(3L, statsProvider.getLongValue(MAINTENANCE_STATUS)); + assertEquals( + IHostStatus.toBuildersSet(none), + thrift.endMaintenance(hosts).getResult().getEndMaintenanceResult().getStatuses()); + assertEquals(1L, statsProvider.getLongValue(END_MAINTENANCE)); + } + private static Response okEmptyResponse() { return response(OK, Optional.empty()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java index 40851c4..f0199a4 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java @@ -25,9 +25,12 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import org.apache.aurora.common.application.ShutdownStage; import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AuroraAdmin; import org.apache.aurora.gen.Container; @@ -48,10 +51,12 @@ import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.aurora.scheduler.app.local.FakeNonVolatileStorage; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.config.CliOptions; +import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.cron.quartz.CronModule; +import org.apache.aurora.scheduler.maintenance.MaintenanceController; import org.apache.aurora.scheduler.mesos.DriverFactory; import org.apache.aurora.scheduler.mesos.DriverSettings; import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory; @@ -138,6 +143,10 @@ public class ThriftIT extends EasyMockTest { bindMock(StorageBackup.class); bindMock(SnapshotStore.class); bind(IServerInfo.class).toInstance(SERVER_INFO); + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith( + MaintenanceController.MaintenanceControllerImpl.PollingInterval.class) + .toInstance(new TimeAmount(1, Time.MINUTES)); } @Provides @@ -181,6 +190,8 @@ public class ThriftIT extends EasyMockTest { true, true, false, + 20, + 1800, ConfigurationManager.DEFAULT_ALLOWED_JOB_ENVIRONMENTS); createThrift(configurationManagerSettings); @@ -189,6 +200,7 @@ public class ThriftIT extends EasyMockTest { TaskConfig task = TaskTestUtil.makeConfig(TaskTestUtil.JOB).newBuilder(); task.unsetExecutorConfig(); + task.unsetSlaPolicy(); task.setProduction(false) .setTier(TaskTestUtil.DEV_TIER_NAME) .setContainer(Container.docker(new DockerContainer() http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java b/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java index d412090..9903811 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java @@ -36,6 +36,7 @@ import org.apache.aurora.gen.JobUpdateRequest; import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.gen.Response; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.SlaPolicy; import org.apache.aurora.gen.TaskQuery; import org.apache.aurora.scheduler.http.api.security.AuthorizingParam; import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift; @@ -200,6 +201,12 @@ public class MockDecoratedThrift implements AnnotatedAuroraAdmin { } @Override + public Response slaDrainHosts(Hosts hosts, SlaPolicy defaultSlaPolicy, long timeoutSecs) + throws TException { + return this.annotatedAuroraAdmin.slaDrainHosts(hosts, defaultSlaPolicy, timeoutSecs); + } + + @Override public Response snapshot() throws TException { return this.annotatedAuroraAdmin.snapshot(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/admin/test_maintenance.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/admin/test_maintenance.py b/src/test/python/apache/aurora/admin/test_maintenance.py index ca0239b..d0f7956 100644 --- a/src/test/python/apache/aurora/admin/test_maintenance.py +++ b/src/test/python/apache/aurora/admin/test_maintenance.py @@ -18,7 +18,13 @@ from mock import Mock, patch from twitter.common.contextutil import temporary_file from apache.aurora.admin.host_maintenance import HostMaintenance -from apache.aurora.admin.maintenance import host_activate, host_deactivate, host_drain, host_status +from apache.aurora.admin.maintenance import ( + host_activate, + host_deactivate, + host_drain, + host_status, + sla_host_drain +) from .util import AuroraClientCommandTest @@ -29,12 +35,15 @@ from gen.apache.aurora.api.ttypes import ( HostStatus, MaintenanceMode, MaintenanceStatusResult, + PercentageSlaPolicy, + SlaPolicy, StartMaintenanceResult ) class TestMaintenanceCommands(AuroraClientCommandTest): HOSTNAMES = ['us-grf-20', 'us-jim-47', 'us-suz-01'] + DEFAULT_SLA_PERCENTAGE = 95 def make_mock_options(self): mock_options = Mock() @@ -46,6 +55,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest): mock_options.percentage = None mock_options.duration = None mock_options.reason = None + mock_options.default_duration = '30m' + mock_options.default_percentage = self.DEFAULT_SLA_PERCENTAGE + mock_options.timeout = '2h' return mock_options def create_host_statuses(self, maintenance_mode, skip_hosts=None): @@ -127,9 +139,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest): mock_scheduler_proxy.startMaintenance.return_value = self.create_start_maintenance_result() mock_scheduler_proxy.drainHosts.return_value = self.create_start_maintenance_result() mock_vector = self.create_mock_probe_hosts_vector([ - self.create_probe_hosts(self.HOSTNAMES[0], 95, True, None), - self.create_probe_hosts(self.HOSTNAMES[1], 95, True, None), - self.create_probe_hosts(self.HOSTNAMES[2], 95, True, None) + self.create_probe_hosts(self.HOSTNAMES[0], self.DEFAULT_SLA_PERCENTAGE, True, None), + self.create_probe_hosts(self.HOSTNAMES[1], self.DEFAULT_SLA_PERCENTAGE, True, None), + self.create_probe_hosts(self.HOSTNAMES[2], self.DEFAULT_SLA_PERCENTAGE, True, None) ]) with contextlib.nested( @@ -163,9 +175,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest): skip_hosts=['us-grf-20']) mock_scheduler_proxy.drainHosts.return_value = self.create_start_maintenance_result() mock_vector = self.create_mock_probe_hosts_vector([ - self.create_probe_hosts(self.HOSTNAMES[0], 95, True, None), - self.create_probe_hosts(self.HOSTNAMES[1], 95, True, None), - self.create_probe_hosts(self.HOSTNAMES[2], 95, True, None) + self.create_probe_hosts(self.HOSTNAMES[0], self.DEFAULT_SLA_PERCENTAGE, True, None), + self.create_probe_hosts(self.HOSTNAMES[1], self.DEFAULT_SLA_PERCENTAGE, True, None), + self.create_probe_hosts(self.HOSTNAMES[2], self.DEFAULT_SLA_PERCENTAGE, True, None) ]) with contextlib.nested( @@ -192,9 +204,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest): mock_scheduler_proxy.startMaintenance.return_value = self.create_start_maintenance_result() mock_scheduler_proxy.drainHosts.return_value = self.create_start_maintenance_result() mock_vector = self.create_mock_probe_hosts_vector([ - self.create_probe_hosts(self.HOSTNAMES[0], 95, False, None), - self.create_probe_hosts(self.HOSTNAMES[1], 95, False, None), - self.create_probe_hosts(self.HOSTNAMES[2], 95, False, None) + self.create_probe_hosts(self.HOSTNAMES[0], self.DEFAULT_SLA_PERCENTAGE, False, None), + self.create_probe_hosts(self.HOSTNAMES[1], self.DEFAULT_SLA_PERCENTAGE, False, None), + self.create_probe_hosts(self.HOSTNAMES[2], self.DEFAULT_SLA_PERCENTAGE, False, None) ]) with contextlib.nested( @@ -222,9 +234,9 @@ class TestMaintenanceCommands(AuroraClientCommandTest): mock_scheduler_proxy.startMaintenance.return_value = self.create_start_maintenance_result() mock_scheduler_proxy.drainHosts.return_value = self.create_start_maintenance_result() mock_vector = self.create_mock_probe_hosts_vector([ - self.create_probe_hosts(self.HOSTNAMES[0], 95, False, None), - self.create_probe_hosts(self.HOSTNAMES[1], 95, False, None), - self.create_probe_hosts(self.HOSTNAMES[2], 95, False, None) + self.create_probe_hosts(self.HOSTNAMES[0], self.DEFAULT_SLA_PERCENTAGE, False, None), + self.create_probe_hosts(self.HOSTNAMES[1], self.DEFAULT_SLA_PERCENTAGE, False, None), + self.create_probe_hosts(self.HOSTNAMES[2], self.DEFAULT_SLA_PERCENTAGE, False, None) ]) mock_wait = Mock() @@ -326,3 +338,46 @@ class TestMaintenanceCommands(AuroraClientCommandTest): mock_scheduler_proxy.maintenanceStatus.assert_called_with( Hosts(set(self.HOSTNAMES)), retry=True) + + def test_perform_sla_maintenance_hosts(self): + mock_options = self.make_mock_options() + + mock_api, mock_scheduler_proxy = self.create_mock_api() + mock_scheduler_proxy.slaDrainHosts.return_value = self.create_start_maintenance_result() + + with contextlib.nested( + patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy), + patch('apache.aurora.admin.maintenance.CLUSTERS', new=self.TEST_CLUSTERS), + patch('twitter.common.app.get_options', return_value=mock_options)): + sla_host_drain([self.TEST_CLUSTER]) + + mock_scheduler_proxy.slaDrainHosts.assert_called_with( + Hosts(set(self.HOSTNAMES)), + SlaPolicy(percentageSlaPolicy=PercentageSlaPolicy( + percentage=self.DEFAULT_SLA_PERCENTAGE, + durationSecs=1800)), + 7200) + assert mock_scheduler_proxy.slaDrainHosts.call_count == 1 + + def test_perform_sla_maintenance_hosts_defaults(self): + mock_options = self.make_mock_options() + mock_options.default_percentage = self.DEFAULT_SLA_PERCENTAGE + mock_options.default_duration = '2m' + mock_options.timeout = '10m' + + mock_api, mock_scheduler_proxy = self.create_mock_api() + mock_scheduler_proxy.slaDrainHosts.return_value = self.create_start_maintenance_result() + + with contextlib.nested( + patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy), + patch('apache.aurora.admin.maintenance.CLUSTERS', new=self.TEST_CLUSTERS), + patch('twitter.common.app.get_options', return_value=mock_options)): + sla_host_drain([self.TEST_CLUSTER]) + + mock_scheduler_proxy.slaDrainHosts.assert_called_with( + Hosts(set(self.HOSTNAMES)), + SlaPolicy(percentageSlaPolicy=PercentageSlaPolicy( + percentage=self.DEFAULT_SLA_PERCENTAGE, + durationSecs=120)), + 600) + assert mock_scheduler_proxy.slaDrainHosts.call_count == 1 http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/api_util.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/api_util.py b/src/test/python/apache/aurora/api_util.py index 3fc9b47..2022db5 100644 --- a/src/test/python/apache/aurora/api_util.py +++ b/src/test/python/apache/aurora/api_util.py @@ -58,6 +58,9 @@ class SchedulerThriftApiSpec(ReadOnlyScheduler.Iface): def drainHosts(self, hosts): pass + def slaDrainHosts(self, hosts, defaultSlaPolicy, timeoutSecs): + pass + def maintenanceStatus(self, hosts): pass http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/client/api/test_scheduler_client.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py index f2a2eae..d1e1e43 100644 --- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py +++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py @@ -45,6 +45,7 @@ from gen.apache.aurora.api.ttypes import ( ResponseCode, ResponseDetail, ScheduleStatus, + SlaPolicy, TaskQuery ) @@ -211,6 +212,14 @@ class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection): self.mox.ReplayAll() self.make_scheduler_proxy().drainHosts(Hosts()) + def test_slaDrainHosts(self): + self.mock_thrift_client.slaDrainHosts( + IsA(Hosts), + IsA(SlaPolicy), + 10).AndReturn(DEFAULT_RESPONSE) + self.mox.ReplayAll() + self.make_scheduler_proxy().slaDrainHosts(Hosts(), SlaPolicy(), 10) + def test_maintenanceStatus(self): self.mock_thrift_client.maintenanceStatus( IsA(Hosts)).AndReturn(DEFAULT_RESPONSE) http://git-wip-us.apache.org/repos/asf/aurora/blob/f2acf53f/src/test/python/apache/aurora/client/cli/test_add.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_add.py b/src/test/python/apache/aurora/client/cli/test_add.py index b22b9f7..ff4ccf6 100644 --- a/src/test/python/apache/aurora/client/cli/test_add.py +++ b/src/test/python/apache/aurora/client/cli/test_add.py @@ -20,7 +20,8 @@ from apache.aurora.client.cli.options import TaskInstanceKey from .util import AuroraClientCommandTest, FakeAuroraCommandContext, mock_verb_options -from gen.apache.aurora.api.ttypes import ScheduleStatus +from gen.apache.aurora.api.constants import ACTIVE_STATES +from gen.apache.aurora.api.ttypes import ScheduleStatus, TaskQuery class TestAddCommand(AuroraClientCommandTest): @@ -37,6 +38,8 @@ class TestAddCommand(AuroraClientCommandTest): self._mock_options.open_browser = True self._fake_context.add_expected_query_result(self.create_query_call_result( self.create_scheduled_task(1, ScheduleStatus.RUNNING))) + self._fake_context.add_expected_query_result( + self.create_query_call_result(), job_key=self.TEST_JOBKEY) self._mock_api.add_instances.return_value = self.create_simple_success_response() @@ -50,11 +53,17 @@ class TestAddCommand(AuroraClientCommandTest): assert mock_webbrowser.mock_calls == [ call("http://something_or_other/scheduler/bozo/test/hello") ] + assert self._mock_api.query_no_configs.mock_calls == [ + call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], statuses=ACTIVE_STATES)), + call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], statuses=ACTIVE_STATES)) + ] def test_wait_added_instances(self): self._mock_options.wait_until = 'RUNNING' self._fake_context.add_expected_query_result(self.create_query_call_result( self.create_scheduled_task(1, ScheduleStatus.PENDING))) + self._fake_context.add_expected_query_result( + self.create_query_call_result(), job_key=self.TEST_JOBKEY) self._mock_api.add_instances.return_value = self.create_simple_success_response() @@ -70,6 +79,10 @@ class TestAddCommand(AuroraClientCommandTest): self.TEST_JOBKEY, self._mock_api, [2, 3, 4])] + assert self._mock_api.query_no_configs.mock_calls == [ + call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], statuses=ACTIVE_STATES)), + call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], statuses=ACTIVE_STATES)) + ] def test_no_active_instance(self): self._fake_context.add_expected_query_result(self.create_empty_task_result()) @@ -79,8 +92,15 @@ class TestAddCommand(AuroraClientCommandTest): def test_add_instances_raises(self): self._fake_context.add_expected_query_result(self.create_query_call_result( self.create_scheduled_task(1, ScheduleStatus.PENDING))) + self._fake_context.add_expected_query_result( + self.create_query_call_result(), job_key=self.TEST_JOBKEY) self._mock_api.add_instances.return_value = self.create_error_response() with pytest.raises(Context.CommandError): self._command.execute(self._fake_context) + + assert self._mock_api.query_no_configs.mock_calls == [ + call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], statuses=ACTIVE_STATES)), + call(TaskQuery(jobKeys=[self.TEST_JOBKEY.to_thrift()], statuses=ACTIVE_STATES)) + ]
