Repository: aurora Updated Branches: refs/heads/master 9faec2f05 -> 0d316cfbe
Re-purposing addInstances RPC to act as scaleOut Bugs closed: AURORA-1258 Reviewed at https://reviews.apache.org/r/42759/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0d316cfb Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0d316cfb Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0d316cfb Branch: refs/heads/master Commit: 0d316cfbe39d3ac17629540550af097cd44992f0 Parents: 9faec2f Author: Maxim Khutornenko <[email protected]> Authored: Wed Jan 27 00:07:31 2016 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Wed Jan 27 00:07:31 2016 -0800 ---------------------------------------------------------------------- NEWS | 3 +- .../thrift/org/apache/aurora/gen/api.thrift | 10 +- .../org/apache/aurora/scheduler/base/Query.java | 4 - .../ShiroAuthorizingParamInterceptor.java | 9 +- .../aurora/scheduler/state/LockManagerImpl.java | 2 +- .../thrift/SchedulerThriftInterface.java | 78 ++++++++--- .../thrift/aop/AnnotatedAuroraAdmin.java | 5 +- .../aurora/scheduler/thrift/Fixtures.java | 3 + .../thrift/SchedulerThriftInterfaceTest.java | 135 ++++++++++++++----- 9 files changed, 179 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index e46b2cc..0e9f3b3 100644 --- a/NEWS +++ b/NEWS @@ -37,7 +37,8 @@ controlled exclusively via the client with `aurora job restart --restart-threshold=[seconds]`. - Removed executor flag `--announcer-enable`. Enabling the announcer previously required both flags `--announcer-enable` and `--announcer-ensemble`, but now only `--announcer-ensemble` must be set. - +- Deprecated `AddInstancesConfig` argument in `addInstances` thrift RPC. The new behavior is to + increase job instance count (scale out) based on the task template pointed by instance `key`. 0.11.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index f0e330c..95313a0 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -1027,10 +1027,14 @@ service AuroraSchedulerManager extends ReadOnlyScheduler { Response killTasks(1: TaskQuery query, 3: Lock lock, 4: JobKey job, 5: set<i32> instances) /** - * Adds new instances specified by the AddInstancesConfig. A job represented by the JobKey must be - * protected by Lock. + * Adds new instances with the TaskConfig of the existing instance pointed by the key. + * TODO(maxim): remove AddInstancesConfig in AURORA-1595. */ - Response addInstances(1: AddInstancesConfig config, 2: Lock lock) + Response addInstances( + 1: AddInstancesConfig config, + 2: Lock lock, + 3: InstanceKey key, + 4: i32 count) /** * Creates and saves a new Lock instance guarding against multiple mutating operations within the http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/base/Query.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java index 7bf0afb..fbadfd3 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Query.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java @@ -110,10 +110,6 @@ public final class Query { return unscoped().byStatus(status, statuses); } - public static Builder statusScoped(Iterable<ScheduleStatus> statuses) { - return unscoped().byStatus(statuses); - } - /** * A Builder of TaskQueries. Builders are immutable and provide access to a set of convenience * methods to return a new builder of another scope. Available scope filters include slave, http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java index 21e565e..3043dfa 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java @@ -39,6 +39,7 @@ import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.gen.AddInstancesConfig; +import org.apache.aurora.gen.InstanceKey; import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.JobUpdateKey; @@ -139,6 +140,9 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { AddInstancesConfig._Fields.KEY, JobKey.class); + private static final FieldGetter<InstanceKey, JobKey> INSTANCE_KEY_GETTER = + new ThriftFieldGetter<>(InstanceKey.class, InstanceKey._Fields.JOB_KEY, JobKey.class); + @SuppressWarnings("unchecked") private static final Set<FieldGetter<?, JobKey>> FIELD_GETTERS = ImmutableSet.of( @@ -150,13 +154,12 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { JOB_UPDATE_KEY_GETTER, ADD_INSTANCES_CONFIG_GETTER, QUERY_TO_JOB_KEY, + INSTANCE_KEY_GETTER, new IdentityFieldGetter<>(JobKey.class)); private static final Map<Class<?>, Function<?, Optional<JobKey>>> FIELD_GETTERS_BY_TYPE = ImmutableMap.<Class<?>, Function<?, Optional<JobKey>>>builder() - .putAll(Maps.uniqueIndex( - FIELD_GETTERS, - (Function<FieldGetter<?, JobKey>, Class<?>>) FieldGetter::getStructClass)) + .putAll(Maps.uniqueIndex(FIELD_GETTERS, FieldGetter::getStructClass)) .build(); @VisibleForTesting http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java index 59c9786..6da6c69 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java @@ -99,7 +99,7 @@ public class LockManagerImpl implements LockManager { if (!stored.equals(heldLock)) { if (stored.isPresent()) { throw new LockException(String.format( - "Unable to perform operation for: %s. Use override/cancel option.", + "Unable to perform operation for %s due to active lock held", formatLockKey(context))); } else if (heldLock.isPresent()) { throw new LockException( http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index c53c49e..6767024 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.thrift; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -23,6 +24,8 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; @@ -39,6 +42,7 @@ import org.apache.aurora.gen.ConfigRewrite; import org.apache.aurora.gen.DrainHostsResult; import org.apache.aurora.gen.EndMaintenanceResult; import org.apache.aurora.gen.Hosts; +import org.apache.aurora.gen.InstanceKey; import org.apache.aurora.gen.InstanceTaskConfig; import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.JobKey; @@ -748,18 +752,16 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } @Override - public Response addInstances(AddInstancesConfig config, @Nullable Lock mutableLock) { - requireNonNull(config); - checkNotBlank(config.getInstanceIds()); - IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey())); + public Response addInstances( + @Nullable AddInstancesConfig config, + @Nullable Lock mutableLock, + @Nullable InstanceKey key, + int count) { - ITaskConfig task; - try { - task = configurationManager.validateAndPopulate(ITaskConfig.build(config.getTaskConfig())); - } catch (TaskDescriptionException e) { - return error(INVALID_REQUEST, e); - } + IJobKey jobKey = + JobKeys.assertValid(IJobKey.build(config == null ? key.getJobKey() : config.getKey())); + Response response = empty(); return storage.write(storeProvider -> { try { if (getCronJob(storeProvider, jobKey).isPresent()) { @@ -770,23 +772,53 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { ILockKey.build(LockKey.job(jobKey.newBuilder())), java.util.Optional.ofNullable(mutableLock).map(ILock::build)); - Iterable<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks( - Query.jobScoped(task.getJob()).active()); + FluentIterable<IScheduledTask> currentTasks = FluentIterable.from( + storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active())); + + ITaskConfig task; + Set<Integer> instanceIds; + if (config == null) { + if (count <= 0) { + return invalidRequest(INVALID_INSTANCE_COUNT); + } + + Optional<IScheduledTask> templateTask = Iterables.tryFind( + currentTasks, + e -> e.getAssignedTask().getInstanceId() == key.getInstanceId()); + if (!templateTask.isPresent()) { + return invalidRequest(INVALID_INSTANCE_ID); + } + + task = templateTask.get().getAssignedTask().getTask(); + int lastId = currentTasks + .transform(e -> e.getAssignedTask().getInstanceId()) + .toList() + .stream() + .max(Comparator.naturalOrder()).get(); + + instanceIds = ContiguousSet.create( + Range.openClosed(lastId, lastId + count), + DiscreteDomain.integers()); + } else { + checkNotBlank(config.getInstanceIds()); + addMessage(response, "The AddInstancesConfig field is deprecated."); + + task = configurationManager.validateAndPopulate( + ITaskConfig.build(config.getTaskConfig())); + instanceIds = ImmutableSet.copyOf(config.getInstanceIds()); + } validateTaskLimits( task, - Iterables.size(currentTasks) + config.getInstanceIdsSize(), - quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize(), storeProvider)); + Iterables.size(currentTasks) + instanceIds.size(), + quotaManager.checkInstanceAddition(task, instanceIds.size(), storeProvider)); - stateManager.insertPendingTasks( - storeProvider, - task, - ImmutableSet.copyOf(config.getInstanceIds())); + stateManager.insertPendingTasks(storeProvider, task, instanceIds); - return ok(); + return response.setResponseCode(OK); } catch (LockException e) { return error(LOCK_ERROR, e); - } catch (TaskValidationException | IllegalArgumentException e) { + } catch (TaskDescriptionException | TaskValidationException | IllegalArgumentException e) { return error(INVALID_REQUEST, e); } }); @@ -1121,4 +1153,10 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { @VisibleForTesting static final String INVALID_PULSE_TIMEOUT = "blockIfNoPulsesAfterMs must be positive."; + + @VisibleForTesting + static final String INVALID_INSTANCE_ID = "No active task found for a given instance ID."; + + @VisibleForTesting + static final String INVALID_INSTANCE_COUNT = "Instance count must be positive."; } http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java index c374343..f2f69f9 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java @@ -19,6 +19,7 @@ import javax.annotation.Nullable; import org.apache.aurora.gen.AddInstancesConfig; import org.apache.aurora.gen.AuroraAdmin; +import org.apache.aurora.gen.InstanceKey; import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.JobUpdateKey; @@ -78,7 +79,9 @@ public interface AnnotatedAuroraAdmin extends AuroraAdmin.Iface { @Override Response addInstances( @AuthorizingParam @Nullable AddInstancesConfig config, - @Nullable Lock lock) throws TException; + @Nullable Lock lock, + @AuthorizingParam @Nullable InstanceKey key, + int count) throws TException; @Override Response acquireLock( http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java index 72d2182..e456056 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java @@ -29,6 +29,7 @@ import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.Container; import org.apache.aurora.gen.ExecutorConfig; import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.InstanceKey; import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.JobSummary; import org.apache.aurora.gen.JobSummaryResult; @@ -80,6 +81,8 @@ final class Fixtures { IResourceAggregate.build(new ResourceAggregate(10.0, 1024, 2048)); static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA); static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA); + static final InstanceKey INSTANCE_KEY = new InstanceKey(JOB_KEY.newBuilder(), 0); + static final TaskConfig INVALID_TASK_CONFIG = defaultTask(true).setTier(","); private Fixtures() { // Utility class. http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java index 84356d6..4ad9211 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java @@ -53,6 +53,7 @@ import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateSummary; import org.apache.aurora.gen.LimitConstraint; import org.apache.aurora.gen.ListBackupsResult; +import org.apache.aurora.gen.Lock; import org.apache.aurora.gen.LockKey; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.MesosContainer; @@ -125,6 +126,8 @@ import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DED import static org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryException; import static org.apache.aurora.scheduler.thrift.Fixtures.CRON_JOB; import static org.apache.aurora.scheduler.thrift.Fixtures.ENOUGH_QUOTA; +import static org.apache.aurora.scheduler.thrift.Fixtures.INSTANCE_KEY; +import static org.apache.aurora.scheduler.thrift.Fixtures.INVALID_TASK_CONFIG; import static org.apache.aurora.scheduler.thrift.Fixtures.JOB_KEY; import static org.apache.aurora.scheduler.thrift.Fixtures.JOB_NAME; import static org.apache.aurora.scheduler.thrift.Fixtures.LOCK; @@ -309,7 +312,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { @Test public void testCreateJobFailsConfigCheck() throws Exception { - IJobConfiguration job = IJobConfiguration.build(makeJob(null)); + IJobConfiguration job = IJobConfiguration.build(makeJob(INVALID_TASK_CONFIG)); control.replay(); assertResponse( @@ -584,6 +587,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { return IScheduledTask.build(new ScheduledTask() .setAssignedTask(new AssignedTask() .setTaskId(taskId) + .setInstanceId(0) .setTask(new TaskConfig() .setJob(JOB_KEY.newBuilder().setName(jobName)) .setOwner(ROLE_IDENTITY) @@ -971,7 +975,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { @Test public void testScheduleCronJobFailedTaskConfigValidation() throws Exception { control.replay(); - IJobConfiguration job = IJobConfiguration.build(makeJob(null)); + IJobConfiguration job = IJobConfiguration.build(makeJob(INVALID_TASK_CONFIG)); assertResponse( INVALID_REQUEST, thrift.scheduleCronJob(job.newBuilder(), null)); @@ -1337,7 +1341,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { // Validate key is populated during sanitizing. AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder()); config.getTaskConfig().unsetJob(); - assertOkResponse(thrift.addInstances(config, LOCK.newBuilder())); + assertOkResponse(deprecatedAddInstances(config, LOCK.newBuilder())); } @Test @@ -1357,104 +1361,162 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { control.replay(); - assertOkResponse(thrift.addInstances(config, null)); + Response response = deprecatedAddInstances(config, null); + assertOkResponse(response); + assertMessageMatches(response, "The AddInstancesConfig field is deprecated."); + } + + @Test + public void testAddInstancesWithInstanceKey() throws Exception { + expectNoCronJob(); + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); + IScheduledTask activeTask = buildScheduledTask(); + ITaskConfig task = activeTask.getAssignedTask().getTask(); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask); + expect(taskIdGenerator.generate(task, 3)).andReturn(TASK_ID); + expect(quotaManager.checkInstanceAddition( + task, + 2, + storageUtil.mutableStoreProvider)).andReturn(ENOUGH_QUOTA); + stateManager.insertPendingTasks( + storageUtil.mutableStoreProvider, + task, + ImmutableSet.of(1, 2)); + + control.replay(); + + assertOkResponse(newAddInstances(INSTANCE_KEY, 2)); + } + + @Test + public void testAddInstancesWithInstanceKeyFailsWithNoInstance() throws Exception { + expectNoCronJob(); + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); + + control.replay(); + + assertEquals( + invalidResponse(SchedulerThriftInterface.INVALID_INSTANCE_ID), + newAddInstances(INSTANCE_KEY, 2)); + } + + @Test + public void testAddInstancesWithInstanceKeyFailsInvalidCount() throws Exception { + expectNoCronJob(); + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); + + control.replay(); + + assertEquals( + invalidResponse(SchedulerThriftInterface.INVALID_INSTANCE_COUNT), + newAddInstances(INSTANCE_KEY, 0)); } @Test public void testAddInstancesFailsConfigCheck() throws Exception { - AddInstancesConfig config = createInstanceConfig(defaultTask(true).setJobName(null)); + AddInstancesConfig config = createInstanceConfig(INVALID_TASK_CONFIG); + expectNoCronJob(); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); control.replay(); - assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder())); + assertResponse(INVALID_REQUEST, deprecatedAddInstances(config, null)); } @Test public void testAddInstancesFailsCronJob() throws Exception { - AddInstancesConfig config = createInstanceConfig(defaultTask(true)); expectCronJob(); control.replay(); - assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder())); + assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1)); } @Test(expected = StorageException.class) public void testAddInstancesFailsWithNonTransient() throws Exception { - AddInstancesConfig config = createInstanceConfig(defaultTask(true)); expect(storageUtil.jobStore.fetchJob(JOB_KEY)).andThrow(new StorageException("no retry")); control.replay(); - thrift.addInstances(config, LOCK.newBuilder()); + newAddInstances(INSTANCE_KEY, 1); } @Test public void testAddInstancesLockCheckFails() throws Exception { - AddInstancesConfig config = createInstanceConfig(defaultTask(true)); expectNoCronJob(); - lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK)); + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); expectLastCall().andThrow(new LockException("Failed lock check.")); control.replay(); - assertResponse(LOCK_ERROR, thrift.addInstances(config, LOCK.newBuilder())); + assertResponse(LOCK_ERROR, newAddInstances(INSTANCE_KEY, 1)); } @Test public void testAddInstancesFailsTaskIdLength() throws Exception { - ITaskConfig populatedTask = ITaskConfig.build(populatedTask()); - AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder()); + IScheduledTask activeTask = buildScheduledTask(); + ITaskConfig task = activeTask.getAssignedTask().getTask(); expectNoCronJob(); - lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK)); - storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask); expect(quotaManager.checkInstanceAddition( anyObject(ITaskConfig.class), anyInt(), eq(storageUtil.mutableStoreProvider))).andReturn(ENOUGH_QUOTA); - expect(taskIdGenerator.generate(populatedTask, 1)) + expect(taskIdGenerator.generate(task, 2)) .andReturn(Strings.repeat("a", MAX_TASK_ID_LENGTH + 1)); control.replay(); - assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder())); + assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1)); } @Test public void testAddInstancesFailsQuotaCheck() throws Exception { - ITaskConfig populatedTask = ITaskConfig.build(populatedTask()); - AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder()); + IScheduledTask activeTask = buildScheduledTask(); + ITaskConfig task = activeTask.getAssignedTask().getTask(); expectNoCronJob(); - lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK)); - storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); - expect(taskIdGenerator.generate(populatedTask, 1)) + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask); + expect(taskIdGenerator.generate(task, 2)) .andReturn(TASK_ID); - expectInstanceQuotaCheck(populatedTask, NOT_ENOUGH_QUOTA); + expectInstanceQuotaCheck(task, NOT_ENOUGH_QUOTA); control.replay(); - assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder())); + assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1)); } @Test public void testAddInstancesInstanceCollisionFailure() throws Exception { - ITaskConfig populatedTask = ITaskConfig.build(populatedTask()); - AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder()); + IScheduledTask activeTask = buildScheduledTask(); + ITaskConfig task = activeTask.getAssignedTask().getTask(); expectNoCronJob(); - lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK)); - storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); - expect(taskIdGenerator.generate(populatedTask, 1)) + lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty()); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask); + expect(taskIdGenerator.generate(task, 2)) .andReturn(TASK_ID); - expectInstanceQuotaCheck(populatedTask, ENOUGH_QUOTA); + expectInstanceQuotaCheck(task, ENOUGH_QUOTA); stateManager.insertPendingTasks( storageUtil.mutableStoreProvider, - populatedTask, - ImmutableSet.of(0)); + task, + ImmutableSet.of(1)); expectLastCall().andThrow(new IllegalArgumentException("instance collision")); control.replay(); - assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder())); + assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1)); + } + + private Response newAddInstances(InstanceKey key, int count) throws Exception { + return thrift.addInstances(null, null, key, count); + } + + private Response deprecatedAddInstances(AddInstancesConfig config, Lock lock) throws Exception { + return thrift.addInstances(config, lock, null, 0); } @Test @@ -1725,8 +1787,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { @Test public void testStartUpdateFailsConfigValidation() throws Exception { - JobUpdateRequest request = - buildJobUpdateRequest(populatedTask().setIsService(true).setNumCpus(-1)); + JobUpdateRequest request = buildJobUpdateRequest(INVALID_TASK_CONFIG.setIsService(true)); control.replay(); assertResponse(INVALID_REQUEST, thrift.startJobUpdate(request, AUDIT_MESSAGE));
