Repository: aurora Updated Branches: refs/heads/master d7a1619fa -> 2e2371481
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 2d7c223..9aef59a 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -92,7 +92,7 @@ import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.UUIDGenerator; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.backup.Recovery; @@ -230,38 +230,35 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { return invalidRequest(NO_CRON); } - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - IJobConfiguration job = sanitized.getJobConfig(); - - try { - lockManager.validateIfLocked( - ILockKey.build(LockKey.job(job.getKey().newBuilder())), - java.util.Optional.ofNullable(mutableLock).map(ILock::build)); - - checkJobExists(storeProvider, job.getKey()); - - ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); - int count = sanitized.getJobConfig().getInstanceCount(); - - validateTaskLimits( - template, - count, - quotaManager.checkInstanceAddition(template, count, storeProvider)); - - LOG.info("Launching " + count + " tasks."); - stateManager.insertPendingTasks( - storeProvider, - template, - sanitized.getInstanceIds()); - - return ok(); - } catch (LockException e) { - return error(LOCK_ERROR, e); - } catch (JobExistsException | TaskValidationException e) { - return error(INVALID_REQUEST, e); - } + return storage.write(storeProvider -> { + IJobConfiguration job = sanitized.getJobConfig(); + + try { + lockManager.validateIfLocked( + ILockKey.build(LockKey.job(job.getKey().newBuilder())), + java.util.Optional.ofNullable(mutableLock).map(ILock::build)); + + checkJobExists(storeProvider, job.getKey()); + + ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); + int count = sanitized.getJobConfig().getInstanceCount(); + + validateTaskLimits( + template, + count, + quotaManager.checkInstanceAddition(template, count, storeProvider)); + + LOG.info("Launching " + count + " tasks."); + stateManager.insertPendingTasks( + storeProvider, + template, + sanitized.getInstanceIds()); + + return ok(); + } catch (LockException e) { + return error(LOCK_ERROR, e); + } catch (JobExistsException | TaskValidationException e) { + return error(INVALID_REQUEST, e); } }); } @@ -299,37 +296,34 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { return invalidRequest(noCronScheduleMessage(jobKey)); } - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - try { - lockManager.validateIfLocked( - ILockKey.build(LockKey.job(jobKey.newBuilder())), - java.util.Optional.ofNullable(mutableLock).map(ILock::build)); - - ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); - int count = sanitized.getJobConfig().getInstanceCount(); - - validateTaskLimits( - template, - count, - quotaManager.checkCronUpdate(sanitized.getJobConfig(), storeProvider)); - - // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob - if (updateOnly || getCronJob(storeProvider, jobKey).isPresent()) { - // The job already has a schedule: so update it. - cronJobManager.updateJob(SanitizedCronJob.from(sanitized)); - } else { - checkJobExists(storeProvider, jobKey); - cronJobManager.createJob(SanitizedCronJob.from(sanitized)); - } - - return ok(); - } catch (LockException e) { - return error(LOCK_ERROR, e); - } catch (JobExistsException | TaskValidationException | CronException e) { - return error(INVALID_REQUEST, e); + return storage.write(storeProvider -> { + try { + lockManager.validateIfLocked( + ILockKey.build(LockKey.job(jobKey.newBuilder())), + java.util.Optional.ofNullable(mutableLock).map(ILock::build)); + + ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); + int count = sanitized.getJobConfig().getInstanceCount(); + + validateTaskLimits( + template, + count, + quotaManager.checkCronUpdate(sanitized.getJobConfig(), storeProvider)); + + // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob + if (updateOnly || getCronJob(storeProvider, jobKey).isPresent()) { + // The job already has a schedule: so update it. + cronJobManager.updateJob(SanitizedCronJob.from(sanitized)); + } else { + checkJobExists(storeProvider, jobKey); + cronJobManager.createJob(SanitizedCronJob.from(sanitized)); } + + return ok(); + } catch (LockException e) { + return error(LOCK_ERROR, e); + } catch (JobExistsException | TaskValidationException | CronException e) { + return error(INVALID_REQUEST, e); } }); } @@ -451,34 +445,31 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { !query.get().isSetOwner(), "The owner field in a query should have been unset by Query.Builder."); - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query); - try { - validateLockForTasks( - java.util.Optional.ofNullable(mutableLock).map(ILock::build), - tasks); - } catch (LockException e) { - return error(LOCK_ERROR, e); - } - - LOG.info("Killing tasks matching " + query); + return storage.write(storeProvider -> { + Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query); + try { + validateLockForTasks( + java.util.Optional.ofNullable(mutableLock).map(ILock::build), + tasks); + } catch (LockException e) { + return error(LOCK_ERROR, e); + } - boolean tasksKilled = false; - for (String taskId : Tasks.ids(tasks)) { - tasksKilled |= StateChangeResult.SUCCESS == stateManager.changeState( - storeProvider, - taskId, - Optional.absent(), - ScheduleStatus.KILLING, - auditMessages.killedByRemoteUser()); - } + LOG.info("Killing tasks matching " + query); - return tasksKilled - ? ok() - : addMessage(empty(), OK, NO_TASKS_TO_KILL_MESSAGE); + boolean tasksKilled = false; + for (String taskId : Tasks.ids(tasks)) { + tasksKilled |= StateChangeResult.SUCCESS == stateManager.changeState( + storeProvider, + taskId, + Optional.absent(), + ScheduleStatus.KILLING, + auditMessages.killedByRemoteUser()); } + + return tasksKilled + ? ok() + : addMessage(empty(), OK, NO_TASKS_TO_KILL_MESSAGE); }); } @@ -491,39 +482,31 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey)); checkNotBlank(shardIds); - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - try { - lockManager.validateIfLocked( - ILockKey.build(LockKey.job(jobKey.newBuilder())), - java.util.Optional.ofNullable(mutableLock).map(ILock::build)); - } catch (LockException e) { - return error(LOCK_ERROR, e); - } + return storage.write(storeProvider -> { + try { + lockManager.validateIfLocked( + ILockKey.build(LockKey.job(jobKey.newBuilder())), + java.util.Optional.ofNullable(mutableLock).map(ILock::build)); + } catch (LockException e) { + return error(LOCK_ERROR, e); + } - Query.Builder query = Query.instanceScoped(jobKey, shardIds).active(); - Iterable<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query); - if (Iterables.size(matchingTasks) != shardIds.size()) { - return invalidRequest("Not all requested shards are active."); - } + Query.Builder query = Query.instanceScoped(jobKey, shardIds).active(); + Iterable<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query); + if (Iterables.size(matchingTasks) != shardIds.size()) { + return invalidRequest("Not all requested shards are active."); + } - LOG.info("Restarting shards matching " + query); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - for (String taskId : Tasks.ids(matchingTasks)) { - stateManager.changeState( - storeProvider, - taskId, - Optional.absent(), - ScheduleStatus.RESTARTING, - auditMessages.restartedByRemoteUser()); - } - } - }); - return ok(); + LOG.info("Restarting shards matching " + query); + for (String taskId : Tasks.ids(matchingTasks)) { + stateManager.changeState( + storeProvider, + taskId, + Optional.absent(), + ScheduleStatus.RESTARTING, + auditMessages.restartedByRemoteUser()); } + return ok(); }); } @@ -538,15 +521,10 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { requireNonNull(resourceAggregate); try { - storage.write(new MutateWork.NoResult<QuotaException>() { - @Override - public void execute(MutableStoreProvider store) throws QuotaException { - quotaManager.saveQuota( - ownerRole, - IResourceAggregate.build(resourceAggregate), - store); - } - }); + storage.write((NoResult<QuotaException>) store -> quotaManager.saveQuota( + ownerRole, + IResourceAggregate.build(resourceAggregate), + store)); return ok(); } catch (QuotaException e) { return error(INVALID_REQUEST, e); @@ -584,17 +562,12 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { checkNotBlank(taskId); requireNonNull(status); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - stateManager.changeState( - storeProvider, - taskId, - Optional.absent(), - status, - auditMessages.transitionedBy()); - } - }); + storage.write(storeProvider -> stateManager.changeState( + storeProvider, + taskId, + Optional.absent(), + status, + auditMessages.transitionedBy())); return ok(); } @@ -653,28 +626,25 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { return addMessage(empty(), INVALID_REQUEST, "No rewrite commands provided."); } - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - List<String> errors = Lists.newArrayList(); + return storage.write(storeProvider -> { + List<String> errors = Lists.newArrayList(); - for (ConfigRewrite command : request.getRewriteCommands()) { - Optional<String> error = rewriteConfig(IConfigRewrite.build(command), storeProvider); - if (error.isPresent()) { - errors.add(error.get()); - } + for (ConfigRewrite command : request.getRewriteCommands()) { + Optional<String> error = rewriteConfig(IConfigRewrite.build(command), storeProvider); + if (error.isPresent()) { + errors.add(error.get()); } + } - Response resp = empty(); - if (errors.isEmpty()) { - resp.setResponseCode(OK); - } else { - for (String error : errors) { - addMessage(resp, WARNING, error); - } + Response resp = empty(); + if (errors.isEmpty()) { + resp.setResponseCode(OK); + } else { + for (String error : errors) { + addMessage(resp, WARNING, error); } - return resp; } + return resp; }); } @@ -778,42 +748,34 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { return error(INVALID_REQUEST, e); } - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - try { - if (getCronJob(storeProvider, jobKey).isPresent()) { - return invalidRequest("Instances may not be added to cron jobs."); - } - - lockManager.validateIfLocked( - ILockKey.build(LockKey.job(jobKey.newBuilder())), - java.util.Optional.ofNullable(mutableLock).map(ILock::build)); - - Iterable<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks( - Query.jobScoped(task.getJob()).active()); - - validateTaskLimits( - task, - Iterables.size(currentTasks) + config.getInstanceIdsSize(), - quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize(), storeProvider)); - - storage.write(new NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - stateManager.insertPendingTasks( - storeProvider, - task, - ImmutableSet.copyOf(config.getInstanceIds())); - } - }); - - return ok(); - } catch (LockException e) { - return error(LOCK_ERROR, e); - } catch (TaskValidationException | IllegalArgumentException e) { - return error(INVALID_REQUEST, e); + return storage.write(storeProvider -> { + try { + if (getCronJob(storeProvider, jobKey).isPresent()) { + return invalidRequest("Instances may not be added to cron jobs."); } + + lockManager.validateIfLocked( + ILockKey.build(LockKey.job(jobKey.newBuilder())), + java.util.Optional.ofNullable(mutableLock).map(ILock::build)); + + Iterable<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks( + Query.jobScoped(task.getJob()).active()); + + validateTaskLimits( + task, + Iterables.size(currentTasks) + config.getInstanceIdsSize(), + quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize(), storeProvider)); + + stateManager.insertPendingTasks( + storeProvider, + task, + ImmutableSet.copyOf(config.getInstanceIds())); + + return ok(); + } catch (LockException e) { + return error(LOCK_ERROR, e); + } catch (TaskValidationException | IllegalArgumentException e) { + return error(INVALID_REQUEST, e); } }); } @@ -960,67 +922,64 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { return error(INVALID_REQUEST, e); } - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - if (getCronJob(storeProvider, job).isPresent()) { - return invalidRequest(NO_CRON); - } + return storage.write(storeProvider -> { + if (getCronJob(storeProvider, job).isPresent()) { + return invalidRequest(NO_CRON); + } - String updateId = uuidGenerator.createNew().toString(); - IJobUpdateSettings settings = request.getSettings(); - - JobDiff diff = JobDiff.compute( - storeProvider.getTaskStore(), - job, - JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()), - settings.getUpdateOnlyTheseInstances()); - - Set<Integer> invalidScope = diff.getOutOfScopeInstances( - Numbers.rangesToInstanceIds(settings.getUpdateOnlyTheseInstances())); - if (!invalidScope.isEmpty()) { - return invalidRequest( - "The update request attempted to update specific instances," - + " but some are irrelevant to the update and current job state: " - + invalidScope); - } + String updateId = uuidGenerator.createNew().toString(); + IJobUpdateSettings settings1 = request.getSettings(); + + JobDiff diff = JobDiff.compute( + storeProvider.getTaskStore(), + job, + JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()), + settings1.getUpdateOnlyTheseInstances()); + + Set<Integer> invalidScope = diff.getOutOfScopeInstances( + Numbers.rangesToInstanceIds(settings1.getUpdateOnlyTheseInstances())); + if (!invalidScope.isEmpty()) { + return invalidRequest( + "The update request attempted to update specific instances," + + " but some are irrelevant to the update and current job state: " + + invalidScope); + } - if (diff.isNoop()) { - return addMessage(empty(), OK, NOOP_JOB_UPDATE_MESSAGE); - } + if (diff.isNoop()) { + return addMessage(empty(), OK, NOOP_JOB_UPDATE_MESSAGE); + } - JobUpdateInstructions instructions = new JobUpdateInstructions() - .setSettings(settings.newBuilder()) - .setInitialState(buildInitialState(diff.getReplacedInstances())); + JobUpdateInstructions instructions = new JobUpdateInstructions() + .setSettings(settings1.newBuilder()) + .setInitialState(buildInitialState(diff.getReplacedInstances())); - Set<Integer> replacements = diff.getReplacementInstances(); - if (!replacements.isEmpty()) { - instructions.setDesiredState( - new InstanceTaskConfig() - .setTask(request.getTaskConfig().newBuilder()) - .setInstances(IRange.toBuildersSet(convertRanges(toRanges(replacements))))); - } + Set<Integer> replacements = diff.getReplacementInstances(); + if (!replacements.isEmpty()) { + instructions.setDesiredState( + new InstanceTaskConfig() + .setTask(request.getTaskConfig().newBuilder()) + .setInstances(IRange.toBuildersSet(convertRanges(toRanges(replacements))))); + } - String remoteUserName = auditMessages.getRemoteUserName(); - IJobUpdate update = IJobUpdate.build(new JobUpdate() - .setSummary(new JobUpdateSummary() - .setKey(new JobUpdateKey(job.newBuilder(), updateId)) - .setUser(remoteUserName)) - .setInstructions(instructions)); - try { - validateTaskLimits( - request.getTaskConfig(), - request.getInstanceCount(), - quotaManager.checkJobUpdate(update, storeProvider)); - - jobUpdateController.start( - update, - new AuditData(remoteUserName, Optional.fromNullable(message))); - return ok(Result.startJobUpdateResult( - new StartJobUpdateResult(update.getSummary().getKey().newBuilder()))); - } catch (UpdateStateException | TaskValidationException e) { - return error(INVALID_REQUEST, e); - } + String remoteUserName = auditMessages.getRemoteUserName(); + IJobUpdate update = IJobUpdate.build(new JobUpdate() + .setSummary(new JobUpdateSummary() + .setKey(new JobUpdateKey(job.newBuilder(), updateId)) + .setUser(remoteUserName)) + .setInstructions(instructions)); + try { + validateTaskLimits( + request.getTaskConfig(), + request.getInstanceCount(), + quotaManager.checkJobUpdate(update, storeProvider)); + + jobUpdateController.start( + update, + new AuditData(remoteUserName, Optional.fromNullable(message))); + return ok(Result.startJobUpdateResult( + new StartJobUpdateResult(update.getSummary().getKey().newBuilder()))); + } catch (UpdateStateException | TaskValidationException e) { + return error(INVALID_REQUEST, e); } }); } @@ -1032,18 +991,15 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { IJobUpdateKey key = IJobUpdateKey.build(mutableKey); JobKeys.assertValid(key.getJob()); - return storage.write(new MutateWork.Quiet<Response>() { - @Override - public Response apply(MutableStoreProvider storeProvider) { - try { - change.modifyUpdate( - jobUpdateController, - key, - new AuditData(auditMessages.getRemoteUserName(), message)); - return ok(); - } catch (UpdateStateException e) { - return error(INVALID_REQUEST, e); - } + return storage.write(storeProvider -> { + try { + change.modifyUpdate( + jobUpdateController, + key, + new AuditData(auditMessages.getRemoteUserName(), message)); + return ok(); + } catch (UpdateStateException e) { + return error(INVALID_REQUEST, e); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java index 79b677d..7f29b79 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AopModule.java @@ -72,12 +72,7 @@ public class AopModule extends AbstractModule { this.toggledMethods = ImmutableMap.copyOf(toggledMethods); } - private static final Function<Method, String> GET_NAME = new Function<Method, String>() { - @Override - public String apply(Method method) { - return method.getName(); - } - }; + private static final Function<Method, String> GET_NAME = Method::getName; @Override protected void configure() { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java index a7b9b07..10e0a5a 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java @@ -42,16 +42,13 @@ class LoggingInterceptor implements MethodInterceptor { private final Map<Class<?>, Function<Object, String>> printFunctions = ImmutableMap.of( JobConfiguration.class, - new Function<Object, String>() { - @Override - public String apply(Object input) { - JobConfiguration configuration = ((JobConfiguration) input).deepCopy(); - if (configuration.isSetTaskConfig()) { - configuration.getTaskConfig().setExecutorConfig( - new ExecutorConfig("BLANKED", "BLANKED")); - } - return configuration.toString(); + input -> { + JobConfiguration configuration = ((JobConfiguration) input).deepCopy(); + if (configuration.isSetTaskConfig()) { + configuration.getTaskConfig().setExecutorConfig( + new ExecutorConfig("BLANKED", "BLANKED")); } + return configuration.toString(); } ); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/updater/JobDiff.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobDiff.java b/src/main/java/org/apache/aurora/scheduler/updater/JobDiff.java index ca25388..7257ee8 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobDiff.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobDiff.java @@ -26,6 +26,7 @@ import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.MapDifference; +import com.google.common.collect.MapDifference.ValueDifference; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -130,12 +131,7 @@ public final class JobDiff { } private static <V> Function<MapDifference.ValueDifference<V>, V> leftValue() { - return new Function<MapDifference.ValueDifference<V>, V>() { - @Override - public V apply(MapDifference.ValueDifference<V> diff) { - return diff.leftValue(); - } - }; + return ValueDifference::leftValue; } private static JobDiff computeUnscoped( http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 43ff094..94b6127 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -53,6 +53,7 @@ import org.apache.aurora.scheduler.state.LockManager.LockException; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.TaskStore; import org.apache.aurora.scheduler.storage.entities.IInstanceKey; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; @@ -81,7 +82,6 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import static org.apache.aurora.scheduler.storage.Storage.MutateWork; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE; @@ -146,55 +146,51 @@ class JobUpdateControllerImpl implements JobUpdateController { requireNonNull(update); requireNonNull(auditData); - storage.write(new MutateWork.NoResult<UpdateStateException>() { - @Override - public void execute(MutableStoreProvider storeProvider) - throws UpdateStateException { + storage.write((NoResult<UpdateStateException>) (MutableStoreProvider storeProvider) -> { - IJobUpdateSummary summary = update.getSummary(); - IJobUpdateInstructions instructions = update.getInstructions(); - IJobKey job = summary.getKey().getJob(); + IJobUpdateSummary summary = update.getSummary(); + IJobUpdateInstructions instructions = update.getInstructions(); + IJobKey job = summary.getKey().getJob(); - // Validate the update configuration by making sure we can create an updater for it. - updateFactory.newUpdate(update.getInstructions(), true); + // Validate the update configuration by making sure we can create an updater for it. + updateFactory.newUpdate(update.getInstructions(), true); - if (instructions.getInitialState().isEmpty() && !instructions.isSetDesiredState()) { - throw new IllegalArgumentException("Update instruction is a no-op."); - } - - List<IJobUpdateSummary> activeJobUpdates = - storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryActiveByJob(job)); - if (!activeJobUpdates.isEmpty()) { - throw new UpdateStateException("An active update already exists for this job, " - + "please terminate it before starting another. " - + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES); - } + if (instructions.getInitialState().isEmpty() && !instructions.isSetDesiredState()) { + throw new IllegalArgumentException("Update instruction is a no-op."); + } - LOG.info("Starting update for job " + job); - ILock lock; - try { - lock = lockManager.acquireLock( - ILockKey.build(LockKey.job(job.newBuilder())), - auditData.getUser()); - } catch (LockException e) { - throw new UpdateStateException(e.getMessage(), e); - } + List<IJobUpdateSummary> activeJobUpdates = + storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryActiveByJob(job)); + if (!activeJobUpdates.isEmpty()) { + throw new UpdateStateException("An active update already exists for this job, " + + "please terminate it before starting another. " + + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES); + } - storeProvider.getJobUpdateStore().saveJobUpdate( - update, - Optional.of(requireNonNull(lock.getToken()))); + LOG.info("Starting update for job " + job); + ILock lock; + try { + lock = lockManager.acquireLock( + ILockKey.build(LockKey.job(job.newBuilder())), + auditData.getUser()); + } catch (LockException e) { + throw new UpdateStateException(e.getMessage(), e); + } - JobUpdateStatus status = ROLLING_FORWARD; - if (isCoordinatedUpdate(instructions)) { - status = ROLL_FORWARD_AWAITING_PULSE; - pulseHandler.initializePulseState(update, status); - } + storeProvider.getJobUpdateStore().saveJobUpdate( + update, + Optional.of(requireNonNull(lock.getToken()))); - recordAndChangeJobUpdateStatus( - storeProvider, - summary.getKey(), - addAuditData(newEvent(status), auditData)); + JobUpdateStatus status = ROLLING_FORWARD; + if (isCoordinatedUpdate(instructions)) { + status = ROLL_FORWARD_AWAITING_PULSE; + pulseHandler.initializePulseState(update, status); } + + recordAndChangeJobUpdateStatus( + storeProvider, + summary.getKey(), + addAuditData(newEvent(status), auditData)); }); } @@ -214,29 +210,26 @@ class JobUpdateControllerImpl implements JobUpdateController { requireNonNull(key); requireNonNull(auditData); LOG.info("Attempting to resume update " + key); - storage.write(new MutateWork.NoResult<UpdateStateException>() { - @Override - public void execute(MutableStoreProvider storeProvider) throws UpdateStateException { - IJobUpdateDetails details = Iterables.getOnlyElement( - storeProvider.getJobUpdateStore().fetchJobUpdateDetails(queryByUpdate(key)), null); - - if (details == null) { - throw new UpdateStateException("Update does not exist: " + key); - } - - IJobUpdate update = details.getUpdate(); - IJobUpdateKey key = update.getSummary().getKey(); - Function<JobUpdateStatus, JobUpdateStatus> stateChange = - isCoordinatedAndPulseExpired(key, update.getInstructions()) - ? GET_BLOCKED_RESUME_STATE - : GET_ACTIVE_RESUME_STATE; + storage.write((NoResult<UpdateStateException>) (MutableStoreProvider storeProvider) -> { + IJobUpdateDetails details = Iterables.getOnlyElement( + storeProvider.getJobUpdateStore().fetchJobUpdateDetails(queryByUpdate(key)), null); - JobUpdateStatus newStatus = stateChange.apply(update.getSummary().getState().getStatus()); - changeUpdateStatus( - storeProvider, - update.getSummary(), - addAuditData(newEvent(newStatus), auditData)); + if (details == null) { + throw new UpdateStateException("Update does not exist: " + key); } + + IJobUpdate update = details.getUpdate(); + IJobUpdateKey key1 = update.getSummary().getKey(); + Function<JobUpdateStatus, JobUpdateStatus> stateChange = + isCoordinatedAndPulseExpired(key1, update.getInstructions()) + ? GET_BLOCKED_RESUME_STATE + : GET_ACTIVE_RESUME_STATE; + + JobUpdateStatus newStatus = stateChange.apply(update.getSummary().getState().getStatus()); + changeUpdateStatus( + storeProvider, + update.getSummary(), + addAuditData(newEvent(newStatus), auditData)); }); } @@ -250,40 +243,32 @@ class JobUpdateControllerImpl implements JobUpdateController { private static Function<JobUpdateStatus, JobUpdateEvent> createAuditedEvent( final AuditData auditData) { - return new Function<JobUpdateStatus, JobUpdateEvent>() { - @Override - public JobUpdateEvent apply(JobUpdateStatus status) { - return addAuditData(newEvent(status), auditData); - } - }; + return status -> addAuditData(newEvent(status), auditData); } @Override public void systemResume() { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - for (IJobUpdateDetails details - : storeProvider.getJobUpdateStore().fetchJobUpdateDetails(ACTIVE_QUERY)) { - - IJobUpdateSummary summary = details.getUpdate().getSummary(); - IJobUpdateInstructions instructions = details.getUpdate().getInstructions(); - IJobUpdateKey key = summary.getKey(); - JobUpdateStatus status = summary.getState().getStatus(); - - if (isCoordinatedUpdate(instructions)) { - LOG.info("Automatically restoring pulse state for " + key); - pulseHandler.initializePulseState(details.getUpdate(), status); - } + storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + for (IJobUpdateDetails details + : storeProvider.getJobUpdateStore().fetchJobUpdateDetails(ACTIVE_QUERY)) { + + IJobUpdateSummary summary = details.getUpdate().getSummary(); + IJobUpdateInstructions instructions = details.getUpdate().getInstructions(); + IJobUpdateKey key = summary.getKey(); + JobUpdateStatus status = summary.getState().getStatus(); + + if (isCoordinatedUpdate(instructions)) { + LOG.info("Automatically restoring pulse state for " + key); + pulseHandler.initializePulseState(details.getUpdate(), status); + } - if (AUTO_RESUME_STATES.contains(status)) { - LOG.info("Automatically resuming update " + key); + if (AUTO_RESUME_STATES.contains(status)) { + LOG.info("Automatically resuming update " + key); - try { - changeJobUpdateStatus(storeProvider, key, newEvent(status), false); - } catch (UpdateStateException e) { - throw Throwables.propagate(e); - } + try { + changeJobUpdateStatus(storeProvider, key, newEvent(status), false); + } catch (UpdateStateException e) { + throw Throwables.propagate(e); } } } @@ -305,21 +290,13 @@ class JobUpdateControllerImpl implements JobUpdateController { if (JobUpdateStateMachine.isAwaitingPulse(state.getStatus())) { // Attempt to unblock a job update previously blocked on expired pulse. - executor.execute(new Runnable() { - @Override - public void run() { - try { - unscopedChangeUpdateStatus( - key, - new Function<JobUpdateStatus, JobUpdateEvent>() { - @Override - public JobUpdateEvent apply(JobUpdateStatus status) { - return new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status)); - } - }); - } catch (UpdateStateException e) { - LOG.severe("Error while processing job update pulse: " + e); - } + executor.execute(() -> { + try { + unscopedChangeUpdateStatus( + key, + status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status))); + } catch (UpdateStateException e) { + LOG.severe("Error while processing job update pulse: " + e); } }); } @@ -344,27 +321,24 @@ class JobUpdateControllerImpl implements JobUpdateController { } private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - IJobKey job = instance.getJobKey(); - UpdateFactory.Update update = updates.get(job); - if (update != null) { - if (update.getUpdater().containsInstance(instance.getInstanceId())) { - LOG.info("Forwarding task change for " + InstanceKeys.toString(instance)); - try { - evaluateUpdater( - storeProvider, - update, - getOnlyMatch(storeProvider.getJobUpdateStore(), queryActiveByJob(job)), - ImmutableMap.of(instance.getInstanceId(), state)); - } catch (UpdateStateException e) { - throw Throwables.propagate(e); - } - } else { - LOG.info("Instance " + instance + " is not part of active update for " - + JobKeys.canonicalString(job)); + storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + IJobKey job = instance.getJobKey(); + UpdateFactory.Update update = updates.get(job); + if (update != null) { + if (update.getUpdater().containsInstance(instance.getInstanceId())) { + LOG.info("Forwarding task change for " + InstanceKeys.toString(instance)); + try { + evaluateUpdater( + storeProvider, + update, + getOnlyMatch(storeProvider.getJobUpdateStore(), queryActiveByJob(job)), + ImmutableMap.of(instance.getInstanceId(), state)); + } catch (UpdateStateException e) { + throw Throwables.propagate(e); } + } else { + LOG.info("Instance " + instance + " is not part of active update for " + + JobKeys.canonicalString(job)); } } }); @@ -396,19 +370,15 @@ class JobUpdateControllerImpl implements JobUpdateController { final Function<? super JobUpdateStatus, JobUpdateEvent> stateChange) throws UpdateStateException { - storage.write(new MutateWork.NoResult<UpdateStateException>() { - @Override - public void execute(MutableStoreProvider storeProvider) - throws UpdateStateException { + storage.write((NoResult<UpdateStateException>) (MutableStoreProvider storeProvider) -> { - IJobUpdateSummary update = Iterables.getOnlyElement( - storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByUpdate(key)), null); - if (update == null) { - throw new UpdateStateException("Update does not exist " + key); - } - - changeUpdateStatus(storeProvider, update, stateChange.apply(update.getState().getStatus())); + IJobUpdateSummary update = Iterables.getOnlyElement( + storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByUpdate(key)), null); + if (update == null) { + throw new UpdateStateException("Update does not exist " + key); } + + changeUpdateStatus(storeProvider, update, stateChange.apply(update.getState().getStatus())); }); } @@ -579,12 +549,7 @@ class JobUpdateControllerImpl implements JobUpdateController { } InstanceStateProvider<Integer, Optional<IScheduledTask>> stateProvider = - new InstanceStateProvider<Integer, Optional<IScheduledTask>>() { - @Override - public Optional<IScheduledTask> getState(Integer instanceId) { - return getActiveInstance(storeProvider.getTaskStore(), key.getJob(), instanceId); - } - }; + instanceId -> getActiveInstance(storeProvider.getTaskStore(), key.getJob(), instanceId); EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider); @@ -683,12 +648,7 @@ class JobUpdateControllerImpl implements JobUpdateController { @VisibleForTesting static final Function<IJobInstanceUpdateEvent, JobUpdateAction> EVENT_TO_ACTION = - new Function<IJobInstanceUpdateEvent, JobUpdateAction>() { - @Override - public JobUpdateAction apply(IJobInstanceUpdateEvent event) { - return event.getAction(); - } - }; + IJobInstanceUpdateEvent::getAction; @VisibleForTesting static String failureMessage(int instanceId, Failure failure) { @@ -735,37 +695,29 @@ class JobUpdateControllerImpl implements JobUpdateController { } private Runnable getDeferredEvaluator(final IInstanceKey instance, final IJobUpdateKey key) { - return new Runnable() { - @Override - public void run() { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - IJobUpdateSummary summary = - getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key)); - JobUpdateStatus status = summary.getState().getStatus(); - // Suppress this evaluation if the updater is not currently active. - if (JobUpdateStateMachine.isActive(status)) { - UpdateFactory.Update update = updates.get(instance.getJobKey()); - try { - evaluateUpdater( - storeProvider, - update, - summary, - ImmutableMap.of( - instance.getInstanceId(), - getActiveInstance( - storeProvider.getTaskStore(), - instance.getJobKey(), - instance.getInstanceId()))); - } catch (UpdateStateException e) { - throw Throwables.propagate(e); - } - } - } - }); + return () -> storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + IJobUpdateSummary summary = + getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key)); + JobUpdateStatus status = summary.getState().getStatus(); + // Suppress this evaluation if the updater is not currently active. + if (JobUpdateStateMachine.isActive(status)) { + UpdateFactory.Update update = updates.get(instance.getJobKey()); + try { + evaluateUpdater( + storeProvider, + update, + summary, + ImmutableMap.of( + instance.getInstanceId(), + getActiveInstance( + storeProvider.getTaskStore(), + instance.getJobKey(), + instance.getInstanceId()))); + } catch (UpdateStateException e) { + throw Throwables.propagate(e); + } } - }; + }); } private static class PulseHandler { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java index 1dbab1e..7ab739a 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java @@ -128,12 +128,7 @@ final class JobUpdateStateMachine { .build(); static final Function<JobUpdateStatus, JobUpdateStatus> GET_PAUSE_STATE = - new Function<JobUpdateStatus, JobUpdateStatus>() { - @Override - public JobUpdateStatus apply(JobUpdateStatus status) { - return PAUSE_BEHAVIOR.get(status); - } - }; + PAUSE_BEHAVIOR::get; private static final Map<JobUpdateStatus, JobUpdateStatus> RESUME_ACTIVE_BEHAVIOR = ImmutableMap.<JobUpdateStatus, JobUpdateStatus>builder() @@ -150,28 +145,13 @@ final class JobUpdateStateMachine { .build(); static final Function<JobUpdateStatus, JobUpdateStatus> GET_ACTIVE_RESUME_STATE = - new Function<JobUpdateStatus, JobUpdateStatus>() { - @Override - public JobUpdateStatus apply(JobUpdateStatus status) { - return RESUME_ACTIVE_BEHAVIOR.get(status); - } - }; + RESUME_ACTIVE_BEHAVIOR::get; static final Function<JobUpdateStatus, JobUpdateStatus> GET_BLOCKED_RESUME_STATE = - new Function<JobUpdateStatus, JobUpdateStatus>() { - @Override - public JobUpdateStatus apply(JobUpdateStatus status) { - return RESUME_BLOCKED_BEHAVIOR.get(status); - } - }; + RESUME_BLOCKED_BEHAVIOR::get; static final Function<JobUpdateStatus, JobUpdateStatus> GET_UNBLOCKED_STATE = - new Function<JobUpdateStatus, JobUpdateStatus>() { - @Override - public JobUpdateStatus apply(JobUpdateStatus status) { - return UNBLOCK_BEHAVIOR.get(status); - } - }; + UNBLOCK_BEHAVIOR::get; static JobUpdateStatus getBlockedState(JobUpdateStatus status) { return BLOCK_BEHAVIOR.get(status); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java index 4650611..72ccfa8 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java @@ -26,7 +26,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; -import com.google.common.collect.Maps.EntryTransformer; import org.apache.aurora.common.util.StateMachine; import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy; @@ -82,21 +81,11 @@ class OneWayJobUpdater<K, T> { this.instances = ImmutableMap.copyOf(Maps.transformEntries( instanceEvaluators, - new EntryTransformer<K, StateEvaluator<T>, InstanceUpdate<T>>() { - @Override - public InstanceUpdate<T> transformEntry(K key, StateEvaluator<T> value) { - return new InstanceUpdate<>("Instance " + key, value); - } - })); + (key, value) -> new InstanceUpdate<>("Instance " + key, value))); } private static final Function<InstanceUpdate<?>, SideEffect.InstanceUpdateStatus> GET_STATE = - new Function<InstanceUpdate<?>, SideEffect.InstanceUpdateStatus>() { - @Override - public SideEffect.InstanceUpdateStatus apply(InstanceUpdate<?> manager) { - return manager.getState(); - } - }; + InstanceUpdate::getState; private static <K, T> Set<K> filterByStatus( Map<K, InstanceUpdate<T>> instances,
