http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java index 1b03f47..9ca5381 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java @@ -20,7 +20,6 @@ import java.util.logging.Logger; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; @@ -35,7 +34,6 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IDockerContainer; -import org.apache.aurora.scheduler.storage.entities.IDockerParameter; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos; @@ -156,12 +154,8 @@ public interface MesosTaskFactory { IDockerContainer config = taskConfig.getContainer().getDocker(); Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(), - new Function<IDockerParameter, Protos.Parameter>() { - @Override public Protos.Parameter apply(IDockerParameter item) { - return Protos.Parameter.newBuilder().setKey(item.getName()) - .setValue(item.getValue()).build(); - } - }); + item -> Protos.Parameter.newBuilder().setKey(item.getName()) + .setValue(item.getValue()).build()); ContainerInfo.DockerInfo.Builder dockerBuilder = ContainerInfo.DockerInfo.newBuilder() .setImage(config.getImage()).addAllParameters(parameters);
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java index 35c30f1..1b1443d 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java @@ -70,12 +70,7 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { @Override protected void startUp() { Optional<String> frameworkId = storage.read( - new Storage.Work.Quiet<Optional<String>>() { - @Override - public Optional<String> apply(Storage.StoreProvider storeProvider) { - return storeProvider.getSchedulerStore().fetchFrameworkId(); - } - }); + storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId()); LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri()); if (!driverSettings.getCredentials().isPresent()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java index 8788f4d..f783e7f 100644 --- a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java +++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java @@ -19,7 +19,6 @@ import java.util.Set; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Ticker; import com.google.common.cache.CacheBuilder; @@ -39,7 +38,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; /** * Tracks vetoes against scheduling decisions and maintains the closest fit among all the vetoes @@ -92,12 +90,7 @@ public class NearestFit implements EventSubscriber { @Subscribe public synchronized void remove(TasksDeleted deletedEvent) { fitByGroupKey.invalidateAll(Iterables.transform(deletedEvent.getTasks(), Functions.compose( - new Function<ITaskConfig, TaskGroupKey>() { - @Override - public TaskGroupKey apply(ITaskConfig task) { - return TaskGroupKey.from(task); - } - }, + TaskGroupKey::from, Tasks::getConfig))); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 88c9f66..aa22473 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -190,12 +190,7 @@ public interface OfferManager extends EventSubscriber { } else { hostOffers.add(offer); executor.execute( - new Runnable() { - @Override - public void run() { - removeAndDecline(offer.getOffer().getId()); - } - }, + () -> removeAndDecline(offer.getOffer().getId()), returnDelay.get()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java index ffc109e..70390f6 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit; import javax.inject.Inject; import com.google.common.base.Optional; -import com.google.common.base.Supplier; import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -85,12 +84,7 @@ public class BiCache<K, V> { statsProvider.makeGauge( settings.cacheSizeStatName, - new Supplier<Long>() { - @Override - public Long get() { - return cache.size(); - } - }); + cache::size); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java index 5061767..5687bc5 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java @@ -116,69 +116,66 @@ public class PendingTaskProcessor implements Runnable { @Override public void run() { metrics.recordTaskProcessorRun(); - storage.read(new Storage.Work.Quiet<Void>() { - @Override - public Void apply(StoreProvider store) { - Multimap<String, PreemptionVictim> slavesToActiveTasks = - clusterState.getSlavesToActiveTasks(); + storage.read(store -> { + Multimap<String, PreemptionVictim> slavesToActiveTasks = + clusterState.getSlavesToActiveTasks(); - if (slavesToActiveTasks.isEmpty()) { - // No preemption victims to consider. - return null; - } - - // Group the offers by slave id so they can be paired with active tasks from the same slave. - Map<String, HostOffer> slavesToOffers = - Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID); - - Set<String> allSlaves = Sets.newHashSet(Iterables.concat( - slavesToOffers.keySet(), - slavesToActiveTasks.keySet())); - - // The algorithm below attempts to find a reservation for every task group by matching - // it against all available slaves until a preemption slot is found. Groups are evaluated - // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2). - // A slave is removed from further matching once a reservation is made. Similarly, all - // identical task group instances are removed from further iteration if none of the - // available slaves could yield a preemption proposal. A consuming iterator is used for - // task groups to ensure iteration order is preserved after a task group is removed. - LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store); - List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store); - Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator()); - while (!pendingGroups.isEmpty()) { - boolean matched = false; - TaskGroupKey group = groups.next(); - ITaskConfig task = group.getTask(); - - metrics.recordPreemptionAttemptFor(task); - Iterator<String> slaveIterator = allSlaves.iterator(); - while (slaveIterator.hasNext()) { - String slaveId = slaveIterator.next(); - Optional<ImmutableSet<PreemptionVictim>> candidates = - preemptionVictimFilter.filterPreemptionVictims( - task, - slavesToActiveTasks.get(slaveId), - jobStates.getUnchecked(task.getJob()), - Optional.fromNullable(slavesToOffers.get(slaveId)), - store); + if (slavesToActiveTasks.isEmpty()) { + // No preemption victims to consider. + return null; + } - metrics.recordSlotSearchResult(candidates, task); - if (candidates.isPresent()) { - // Slot found -> remove slave to avoid multiple task reservations. - slaveIterator.remove(); - slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group); - matched = true; - break; - } - } - if (!matched) { - // No slot found for the group -> remove group and reset group iterator. - pendingGroups.removeAll(ImmutableSet.of(group)); - groups = Iterators.consumingIterator(pendingGroups.iterator()); + // Group the offers by slave id so they can be paired with active tasks from the same slave. + Map<String, HostOffer> slavesToOffers = + Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID); + + Set<String> allSlaves = Sets.newHashSet(Iterables.concat( + slavesToOffers.keySet(), + slavesToActiveTasks.keySet())); + + // The algorithm below attempts to find a reservation for every task group by matching + // it against all available slaves until a preemption slot is found. Groups are evaluated + // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2). + // A slave is removed from further matching once a reservation is made. Similarly, all + // identical task group instances are removed from further iteration if none of the + // available slaves could yield a preemption proposal. A consuming iterator is used for + // task groups to ensure iteration order is preserved after a task group is removed. + LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store); + List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store); + Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator()); + while (!pendingGroups.isEmpty()) { + boolean matched = false; + TaskGroupKey group = groups.next(); + ITaskConfig task = group.getTask(); + + metrics.recordPreemptionAttemptFor(task); + Iterator<String> slaveIterator = allSlaves.iterator(); + while (slaveIterator.hasNext()) { + String slaveId = slaveIterator.next(); + Optional<ImmutableSet<PreemptionVictim>> candidates = + preemptionVictimFilter.filterPreemptionVictims( + task, + slavesToActiveTasks.get(slaveId), + jobStates.getUnchecked(task.getJob()), + Optional.fromNullable(slavesToOffers.get(slaveId)), + store); + + metrics.recordSlotSearchResult(candidates, task); + if (candidates.isPresent()) { + // Slot found -> remove slave to avoid multiple task reservations. + slaveIterator.remove(); + slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group); + matched = true; + break; } } - return null; + if (!matched) { + // No slot found for the group -> remove group and reset group iterator. + pendingGroups.removeAll(ImmutableSet.of(group)); + groups = Iterators.consumingIterator(pendingGroups.iterator()); + } } + return null; }); } @@ -225,12 +222,7 @@ public class PendingTaskProcessor implements Runnable { } private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY = - new Function<IAssignedTask, TaskGroupKey>() { - @Override - public TaskGroupKey apply(IAssignedTask task) { - return TaskGroupKey.from(task.getTask()); - } - }; + task -> TaskGroupKey.from(task.getTask()); private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() { @Override @@ -248,10 +240,5 @@ public class PendingTaskProcessor implements Runnable { }; private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID = - new Function<HostOffer, String>() { - @Override - public String apply(HostOffer offer) { - return offer.getOffer().getSlaveId().getValue(); - } - }; + offer -> offer.getOffer().getSlaveId().getValue(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index edfa202..7f84e90 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -97,28 +97,13 @@ public interface PreemptionVictimFilter { } private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = - new Function<HostOffer, ResourceSlot>() { - @Override - public ResourceSlot apply(HostOffer offer) { - return Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot(); - } - }; + offer -> Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot(); private static final Function<HostOffer, String> OFFER_TO_HOST = - new Function<HostOffer, String>() { - @Override - public String apply(HostOffer offer) { - return offer.getOffer().getHostname(); - } - }; + offer -> offer.getOffer().getHostname(); private static final Function<PreemptionVictim, String> VICTIM_TO_HOST = - new Function<PreemptionVictim, String>() { - @Override - public String apply(PreemptionVictim victim) { - return victim.getSlaveHost(); - } - }; + PreemptionVictim::getSlaveHost; private final Function<PreemptionVictim, ResourceSlot> victimToResources = new Function<PreemptionVictim, ResourceSlot>() { @@ -200,24 +185,21 @@ public interface PreemptionVictimFilter { * with {@code preemptableTask}. */ private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) { - return new Predicate<PreemptionVictim>() { - @Override - public boolean apply(PreemptionVictim possibleVictim) { - boolean pendingIsProduction = pendingTask.isProduction(); - boolean victimIsProduction = possibleVictim.isProduction(); - - if (pendingIsProduction && !victimIsProduction) { - return true; - } else if (pendingIsProduction == victimIsProduction) { - // If production flags are equal, preemption is based on priority within the same role. - if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) { - return pendingTask.getPriority() > possibleVictim.getPriority(); - } else { - return false; - } + return possibleVictim -> { + boolean pendingIsProduction = pendingTask.isProduction(); + boolean victimIsProduction = possibleVictim.isProduction(); + + if (pendingIsProduction && !victimIsProduction) { + return true; + } else if (pendingIsProduction == victimIsProduction) { + // If production flags are equal, preemption is based on priority within the same role. + if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) { + return pendingTask.getPriority() > possibleVictim.getPriority(); } else { return false; } + } else { + return false; } }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java index fc9dac8..d108742 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java @@ -32,10 +32,7 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import static java.util.Objects.requireNonNull; @@ -154,14 +151,6 @@ public class PreemptorModule extends AbstractModule { } } - private static final Preemptor NULL_PREEMPTOR = new Preemptor() { - @Override - public Optional<String> attemptPreemptionFor( - IAssignedTask task, - AttributeAggregate jobState, - Storage.MutableStoreProvider storeProvider) { - - return Optional.absent(); - } - }; + private static final Preemptor NULL_PREEMPTOR = + (task, jobState, storeProvider) -> Optional.absent(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java index 96393eb..f29b7cf 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java @@ -28,7 +28,7 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.Clock; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import static java.util.Objects.requireNonNull; @@ -76,23 +76,15 @@ class JobUpdateHistoryPruner extends AbstractIdleService { @Override protected void startUp() { executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory( - settings.maxUpdatesPerJob, - clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS)); + () -> storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory( + settings.maxUpdatesPerJob, + clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS)); - LOG.info(prunedUpdates.isEmpty() - ? "No job update history to prune." - : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates)); - } - }); - } - }, + LOG.info(prunedUpdates.isEmpty() + ? "No job update history to prune." + : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates)); + }), settings.pruneInterval.as(Time.MILLISECONDS), settings.pruneInterval.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index bb1fc8b..d1108a3 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -122,12 +123,8 @@ public class TaskHistoryPruner implements EventSubscriber { private void deleteTasks(final Set<String> taskIds) { LOG.info("Pruning inactive tasks " + taskIds); - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) { - stateManager.deleteTasks(storeProvider, taskIds); - } - }); + storage.write( + (NoResult.Quiet) storeProvider -> stateManager.deleteTasks(storeProvider, taskIds)); } @VisibleForTesting @@ -142,32 +139,26 @@ public class TaskHistoryPruner implements EventSubscriber { LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms."); executor.execute( - new Runnable() { - @Override - public void run() { - LOG.info("Pruning expired inactive task " + taskId); - deleteTasks(ImmutableSet.of(taskId)); - } + () -> { + LOG.info("Pruning expired inactive task " + taskId); + deleteTasks(ImmutableSet.of(taskId)); }, Amount.of(timeRemaining, Time.MILLISECONDS)); - executor.execute(new Runnable() { - @Override - public void run() { - Iterable<IScheduledTask> inactiveTasks = - Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); - int numInactiveTasks = Iterables.size(inactiveTasks); - int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; - if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { - Set<String> toPrune = FluentIterable - .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) - .filter(safeToDelete) - .limit(tasksToPrune) - .transform(Tasks::id) - .toSet(); - if (!toPrune.isEmpty()) { - deleteTasks(toPrune); - } + executor.execute(() -> { + Iterable<IScheduledTask> inactiveTasks = + Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); + int numInactiveTasks = Iterables.size(inactiveTasks); + int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; + if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { + Set<String> toPrune = FluentIterable + .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) + .filter(safeToDelete) + .limit(tasksToPrune) + .transform(Tasks::id) + .toSet(); + if (!toPrune.isEmpty()) { + deleteTasks(toPrune); } } }); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java index a12910e..c18836a 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java @@ -358,37 +358,29 @@ public interface QuotaManager { final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(ITaskConfig::getJob); return addAll(Iterables.transform( cronTemplates, - new Function<IJobConfiguration, IResourceAggregate>() { - @Override - public IResourceAggregate apply(IJobConfiguration config) { - return max( - scale(config.getTaskConfig(), config.getInstanceCount()), - fromTasks(taskConfigsByKey.get(config.getKey()))); - } - })); + config -> max( + scale(config.getTaskConfig(), config.getInstanceCount()), + fromTasks(taskConfigsByKey.get(config.getKey()))))); } private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter( final Map<IJobKey, IJobUpdateInstructions> roleJobUpdates) { - return new Predicate<IAssignedTask>() { - @Override - public boolean apply(IAssignedTask task) { - Optional<IJobUpdateInstructions> update = Optional.fromNullable( - roleJobUpdates.get(task.getTask().getJob())); - - if (update.isPresent()) { - IJobUpdateInstructions instructions = update.get(); - RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState()); - RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState() - ? ImmutableSet.of(instructions.getDesiredState()) - : ImmutableSet.of()); - - int instanceId = task.getInstanceId(); - return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId); - } - return true; + return task -> { + Optional<IJobUpdateInstructions> update = Optional.fromNullable( + roleJobUpdates.get(task.getTask().getJob())); + + if (update.isPresent()) { + IJobUpdateInstructions instructions = update.get(); + RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState()); + RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState() + ? ImmutableSet.of(instructions.getDesiredState()) + : ImmutableSet.of()); + + int instanceId = task.getInstanceId(); + return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId); } + return true; }; } @@ -397,12 +389,7 @@ public interface QuotaManager { String role) { Function<IJobUpdateSummary, IJobUpdate> fetchUpdate = - new Function<IJobUpdateSummary, IJobUpdate>() { - @Override - public IJobUpdate apply(IJobUpdateSummary summary) { - return jobUpdateStore.fetchJobUpdate(summary.getKey()).get(); - } - }; + summary -> jobUpdateStore.fetchJobUpdate(summary.getKey()).get(); return Maps.transformValues( FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role))) @@ -419,23 +406,13 @@ public interface QuotaManager { } private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES = - new Function<ITaskConfig, IResourceAggregate>() { - @Override - public IResourceAggregate apply(ITaskConfig config) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(config.getNumCpus()) - .setRamMb(config.getRamMb()) - .setDiskMb(config.getDiskMb())); - } - }; + config -> IResourceAggregate.build(new ResourceAggregate() + .setNumCpus(config.getNumCpus()) + .setRamMb(config.getRamMb()) + .setDiskMb(config.getDiskMb())); private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES = - new Function<IInstanceTaskConfig, IResourceAggregate>() { - @Override - public IResourceAggregate apply(IInstanceTaskConfig config) { - return scale(config.getTask(), getUpdateInstanceCount(config.getInstances())); - } - }; + config -> scale(config.getTask(), getUpdateInstanceCount(config.getInstances())); private static IResourceAggregate instructionsToResources( Iterable<IInstanceTaskConfig> instructions) { @@ -459,20 +436,17 @@ public interface QuotaManager { private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources( final Predicate<IInstanceTaskConfig> instanceFilter) { - return new Function<IJobUpdateInstructions, IResourceAggregate>() { - @Override - public IResourceAggregate apply(IJobUpdateInstructions instructions) { - Iterable<IInstanceTaskConfig> initialState = - Iterables.filter(instructions.getInitialState(), instanceFilter); - Iterable<IInstanceTaskConfig> desiredState = Iterables.filter( - Optional.fromNullable(instructions.getDesiredState()).asSet(), - instanceFilter); - - // Calculate result as max(existing, desired) per resource type. - return max( - instructionsToResources(initialState), - instructionsToResources(desiredState)); - } + return instructions -> { + Iterable<IInstanceTaskConfig> initialState = + Iterables.filter(instructions.getInitialState(), instanceFilter); + Iterable<IInstanceTaskConfig> desiredState = Iterables.filter( + Optional.fromNullable(instructions.getDesiredState()).asSet(), + instanceFilter); + + // Calculate result as max(existing, desired) per resource type. + return max( + instructionsToResources(initialState), + instructionsToResources(desiredState)); }; } @@ -518,12 +492,7 @@ public interface QuotaManager { } private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY = - new Function<IJobUpdate, IJobKey>() { - @Override - public IJobKey apply(IJobUpdate input) { - return input.getSummary().getKey().getJob(); - } - }; + input -> input.getSummary().getKey().getJob(); private static int getUpdateInstanceCount(Set<IRange> ranges) { int instanceCount = 0; http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java index c797914..cb5c93e 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java @@ -107,19 +107,16 @@ public class TaskReconciler extends AbstractIdleService { protected void startUp() { // Schedule explicit reconciliation. executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - ImmutableSet<Protos.TaskStatus> active = FluentIterable - .from(Storage.Util.fetchTasks( - storage, - Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES))) - .transform(TASK_TO_PROTO) - .toSet(); - - driver.reconcileTasks(active); - explicitRuns.incrementAndGet(); - } + () -> { + ImmutableSet<Protos.TaskStatus> active = FluentIterable + .from(Storage.Util.fetchTasks( + storage, + Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES))) + .transform(TASK_TO_PROTO) + .toSet(); + + driver.reconcileTasks(active); + explicitRuns.incrementAndGet(); }, settings.explicitDelayMinutes, settings.explicitInterval.as(MINUTES), @@ -127,12 +124,9 @@ public class TaskReconciler extends AbstractIdleService { // Schedule implicit reconciliation. executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - driver.reconcileTasks(ImmutableSet.of()); - implicitRuns.incrementAndGet(); - } + () -> { + driver.reconcileTasks(ImmutableSet.of()); + implicitRuns.incrementAndGet(); }, settings.implicitDelayMinutes, settings.implicitInterval.as(MINUTES), http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java index bfe094b..7c09f7c 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java @@ -39,8 +39,6 @@ import org.apache.aurora.scheduler.storage.Storage; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.storage.Storage.MutateWork; - /** * Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck * tasks will be transitioned to the LOST state. @@ -118,17 +116,12 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { // canceled, but in the event of a state transition race, including transientState // prevents an unintended task timeout. // Note: This requires LOST transitions trigger Driver.killTask. - StateChangeResult result = storage.write(new MutateWork.Quiet<StateChangeResult>() { - @Override - public StateChangeResult apply(Storage.MutableStoreProvider storeProvider) { - return stateManager.changeState( - storeProvider, - taskId, - Optional.of(newState), - ScheduleStatus.LOST, - TIMEOUT_MESSAGE); - } - }); + StateChangeResult result = storage.write(storeProvider -> stateManager.changeState( + storeProvider, + taskId, + Optional.of(newState), + ScheduleStatus.LOST, + TIMEOUT_MESSAGE)); if (result == StateChangeResult.SUCCESS) { LOG.info("Timeout reached for task " + taskId + ":" + taskId); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java index 66d5a10..c044ebe 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java @@ -100,12 +100,9 @@ public class TaskGroups implements EventSubscriber { this.backoff = requireNonNull(settings.taskGroupBackoff); this.rescheduleCalculator = requireNonNull(rescheduleCalculator); - this.taskScheduler = new TaskScheduler() { - @Override - public boolean schedule(String taskId) { - settings.rateLimiter.acquire(); - return taskScheduler.schedule(taskId); - } + this.taskScheduler = taskId -> { + settings.rateLimiter.acquire(); + return taskScheduler.schedule(taskId); }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java index f1b11d6..7930c6c 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -40,7 +40,6 @@ import org.apache.aurora.scheduler.preemptor.Preemptor; import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -111,12 +110,7 @@ public interface TaskScheduler extends EventSubscriber { public boolean schedule(final String taskId) { attemptsFired.incrementAndGet(); try { - return storage.write(new MutateWork.Quiet<Boolean>() { - @Override - public Boolean apply(MutableStoreProvider store) { - return scheduleTask(store, taskId); - } - }); + return storage.write(store -> scheduleTask(store, taskId)); } catch (RuntimeException e) { // We catch the generic unchecked exception here to ensure tasks are not abandoned // if there is a transient issue resulting in an unchecked exception. http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java index 837bab7..787309a 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java @@ -74,22 +74,17 @@ class TaskThrottler implements EventSubscriber { long delayMs = Math.max(0, readyAtMs - clock.nowMillis()); throttleStats.accumulate(delayMs); executor.execute( - new Runnable() { + () -> storage.write(new Storage.MutateWork.NoResult.Quiet() { @Override - public void run() { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) { - stateManager.changeState( - storeProvider, - stateChange.getTaskId(), - Optional.of(THROTTLED), - PENDING, - Optional.absent()); - } - }); + public void execute(Storage.MutableStoreProvider storeProvider) { + stateManager.changeState( + storeProvider, + stateChange.getTaskId(), + Optional.of(THROTTLED), + PENDING, + Optional.absent()); } - }, + }), Amount.of(delayMs, Time.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java index 54fa45a..3ddac8b 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java @@ -103,12 +103,7 @@ class MetricCalculator implements Runnable { } private static final Predicate<ITaskConfig> IS_SERVICE = - new Predicate<ITaskConfig>() { - @Override - public boolean apply(ITaskConfig task) { - return task.isIsService(); - } - }; + ITaskConfig::isIsService; private final LoadingCache<String, Counter> metricCache; private final Storage storage; http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java index 88b2d10..4f243aa 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java @@ -160,12 +160,7 @@ interface SlaAlgorithm { IScheduledTask::getStatus); private static final Function<IScheduledTask, ITaskEvent> TASK_TO_EVENT = - new Function<IScheduledTask, ITaskEvent>() { - @Override - public ITaskEvent apply(IScheduledTask task) { - return Tasks.getLatestEvent(task); - } - }; + Tasks::getLatestEvent; private JobUptime(float percentile) { this.percentile = percentile; @@ -176,12 +171,7 @@ interface SlaAlgorithm { List<Long> uptimes = FluentIterable.from(tasks) .filter(IS_RUNNING) .transform(Functions.compose( - new Function<ITaskEvent, Long>() { - @Override - public Long apply(ITaskEvent event) { - return timeFrame.upperEndpoint() - event.getTimestamp(); - } - }, + event -> timeFrame.upperEndpoint() - event.getTimestamp(), TASK_TO_EVENT)).toList(); return (int) Math.floor((double) SlaUtil.percentile(uptimes, percentile) / 1000); @@ -278,102 +268,86 @@ interface SlaAlgorithm { } private static final Function<IScheduledTask, InstanceId> TO_ID = - new Function<IScheduledTask, InstanceId>() { - @Override - public InstanceId apply(IScheduledTask task) { - return new InstanceId( - task.getAssignedTask().getTask().getJob(), - task.getAssignedTask().getInstanceId()); - } - }; + task -> new InstanceId( + task.getAssignedTask().getTask().getJob(), + task.getAssignedTask().getInstanceId()); private static final Function<ITaskEvent, Long> TASK_EVENT_TO_TIMESTAMP = - new Function<ITaskEvent, Long>() { - @Override - public Long apply(ITaskEvent taskEvent) { - return taskEvent.getTimestamp(); - } - }; + ITaskEvent::getTimestamp; /** * Combine all task events per given instance into the unified sorted instance history view. */ private static final Function<Collection<IScheduledTask>, List<ITaskEvent>> TO_SORTED_EVENTS = - new Function<Collection<IScheduledTask>, List<ITaskEvent>>() { - @Override - public List<ITaskEvent> apply(Collection<IScheduledTask> tasks) { - List<ITaskEvent> result = Lists.newLinkedList(); - for (IScheduledTask task : tasks) { - result.addAll(task.getTaskEvents()); - } - - return Ordering.natural() - .onResultOf(TASK_EVENT_TO_TIMESTAMP).immutableSortedCopy(result); + tasks -> { + List<ITaskEvent> result = Lists.newLinkedList(); + for (IScheduledTask task : tasks) { + result.addAll(task.getTaskEvents()); } + + return Ordering.natural() + .onResultOf(TASK_EVENT_TO_TIMESTAMP).immutableSortedCopy(result); }; /** * Convert instance history into the {@link SlaState} based {@link Interval} list. */ private static final Function<List<ITaskEvent>, List<Interval>> TASK_EVENTS_TO_INTERVALS = - new Function<List<ITaskEvent>, List<Interval>>() { - @Override - public List<Interval> apply(List<ITaskEvent> events) { - - ImmutableList.Builder<Interval> intervals = ImmutableList.builder(); - Pair<SlaState, Long> current = Pair.of(SlaState.REMOVED, 0L); - - for (ITaskEvent event : events) { - long timestamp = event.getTimestamp(); - - // Event status in the instance timeline signifies either of the following: - // - termination of the existing SlaState interval AND start of a new one; - // - continuation of the existing matching SlaState interval. - switch (event.getStatus()) { - case LOST: - case DRAINING: - case PREEMPTING: - current = updateIntervals(timestamp, SlaState.DOWN, current, intervals); - break; - - case PENDING: - case ASSIGNED: - case STARTING: - if (current.getFirst() != SlaState.DOWN) { - current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals); - } - break; - - case THROTTLED: - case FINISHED: - case RESTARTING: - case FAILED: - case KILLING: + events -> { + + ImmutableList.Builder<Interval> intervals = ImmutableList.builder(); + Pair<SlaState, Long> current = Pair.of(SlaState.REMOVED, 0L); + + for (ITaskEvent event : events) { + long timestamp = event.getTimestamp(); + + // Event status in the instance timeline signifies either of the following: + // - termination of the existing SlaState interval AND start of a new one; + // - continuation of the existing matching SlaState interval. + switch (event.getStatus()) { + case LOST: + case DRAINING: + case PREEMPTING: + current = updateIntervals(timestamp, SlaState.DOWN, current, intervals); + break; + + case PENDING: + case ASSIGNED: + case STARTING: + if (current.getFirst() != SlaState.DOWN) { current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals); - break; - - case RUNNING: - current = updateIntervals(timestamp, SlaState.UP, current, intervals); - break; - - case KILLED: - if (current.getFirst() == SlaState.UP) { - current = updateIntervals(timestamp, SlaState.DOWN, current, intervals); - } - break; + } + break; + + case THROTTLED: + case FINISHED: + case RESTARTING: + case FAILED: + case KILLING: + current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals); + break; + + case RUNNING: + current = updateIntervals(timestamp, SlaState.UP, current, intervals); + break; + + case KILLED: + if (current.getFirst() == SlaState.UP) { + current = updateIntervals(timestamp, SlaState.DOWN, current, intervals); + } + break; - case INIT: - // Ignore. - break; + case INIT: + // Ignore. + break; - default: - throw new IllegalArgumentException("Unsupported status:" + event.getStatus()); - } + default: + throw new IllegalArgumentException("Unsupported status:" + event.getStatus()); } - // Add the last event interval. - intervals.add(new Interval(current.getFirst(), current.getSecond(), Long.MAX_VALUE)); - return intervals.build(); } + // Add the last event interval. + intervals.add(new Interval(current.getFirst(), current.getSecond(), Long.MAX_VALUE)); + return intervals.build(); }; private static Pair<SlaState, Long> updateIntervals( http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java index 4827a0d..bf7c084 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java @@ -17,7 +17,6 @@ import java.util.Map; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -62,12 +61,7 @@ interface SlaGroup { "sla_cpu_large_", Range.openClosed(MEDIUM.getNumCpus(), LARGE.getNumCpus()), "sla_cpu_xlarge_", Range.openClosed(LARGE.getNumCpus(), XLARGE.getNumCpus()), "sla_cpu_xxlarge_", Range.greaterThan(XLARGE.getNumCpus())), - new Function<IScheduledTask, Double>() { - @Override - public Double apply(IScheduledTask task) { - return task.getAssignedTask().getTask().getNumCpus(); - } - } + task -> task.getAssignedTask().getTask().getNumCpus() )), RESOURCE_RAM(new Resource<>( ImmutableMap.of( @@ -76,12 +70,7 @@ interface SlaGroup { "sla_ram_large_", Range.openClosed(MEDIUM.getRamMb(), LARGE.getRamMb()), "sla_ram_xlarge_", Range.openClosed(LARGE.getRamMb(), XLARGE.getRamMb()), "sla_ram_xxlarge_", Range.greaterThan(XLARGE.getRamMb())), - new Function<IScheduledTask, Long>() { - @Override - public Long apply(IScheduledTask task) { - return task.getAssignedTask().getTask().getRamMb(); - } - } + task -> task.getAssignedTask().getTask().getRamMb() )), RESOURCE_DISK(new Resource<>( ImmutableMap.of( @@ -90,12 +79,7 @@ interface SlaGroup { "sla_disk_large_", Range.openClosed(MEDIUM.getDiskMb(), LARGE.getDiskMb()), "sla_disk_xlarge_", Range.openClosed(LARGE.getDiskMb(), XLARGE.getDiskMb()), "sla_disk_xxlarge_", Range.greaterThan(XLARGE.getDiskMb())), - new Function<IScheduledTask, Long>() { - @Override - public Long apply(IScheduledTask task) { - return task.getAssignedTask().getTask().getDiskMb(); - } - } + task -> task.getAssignedTask().getTask().getDiskMb() )); private SlaGroup group; @@ -129,12 +113,7 @@ interface SlaGroup { class Cluster implements SlaGroup { @Override public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) { - return Multimaps.index(tasks, new Function<IScheduledTask, String>() { - @Override - public String apply(IScheduledTask task) { - return "sla_cluster_"; - } - }); + return Multimaps.index(tasks, task -> "sla_cluster_"); } } @@ -159,11 +138,8 @@ interface SlaGroup { ImmutableListMultimap.builder(); for (final Map.Entry<String, Range<T>> entry : map.entrySet()) { - result.putAll(entry.getKey(), Iterables.filter(tasks, new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - return entry.getValue().contains(function.apply(task)); - } + result.putAll(entry.getKey(), Iterables.filter(tasks, task -> { + return entry.getValue().contains(function.apply(task)); })); } return result.build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java index 7660022..59c9786 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java @@ -26,10 +26,7 @@ import org.apache.aurora.gen.LockKey._Fields; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.ILock; import org.apache.aurora.scheduler.storage.entities.ILockKey; @@ -54,54 +51,42 @@ public class LockManagerImpl implements LockManager { @Override public ILock acquireLock(final ILockKey lockKey, final String user) throws LockException { - return storage.write(new MutateWork<ILock, LockException>() { - @Override - public ILock apply(Storage.MutableStoreProvider storeProvider) - throws LockException { - - LockStore.Mutable lockStore = storeProvider.getLockStore(); - Optional<ILock> existingLock = lockStore.fetchLock(lockKey); - - if (existingLock.isPresent()) { - throw new LockException(String.format( - "Operation for: %s is already in progress. Started at: %s. Current owner: %s.", - formatLockKey(lockKey), - new Date(existingLock.get().getTimestampMs()).toString(), - existingLock.get().getUser())); - } - - ILock lock = ILock.build(new Lock() - .setKey(lockKey.newBuilder()) - .setToken(tokenGenerator.createNew().toString()) - .setTimestampMs(clock.nowMillis()) - .setUser(user)); - - lockStore.saveLock(lock); - return lock; + return storage.write(storeProvider -> { + + LockStore.Mutable lockStore = storeProvider.getLockStore(); + Optional<ILock> existingLock = lockStore.fetchLock(lockKey); + + if (existingLock.isPresent()) { + throw new LockException(String.format( + "Operation for: %s is already in progress. Started at: %s. Current owner: %s.", + formatLockKey(lockKey), + new Date(existingLock.get().getTimestampMs()).toString(), + existingLock.get().getUser())); } + + ILock lock = ILock.build(new Lock() + .setKey(lockKey.newBuilder()) + .setToken(tokenGenerator.createNew().toString()) + .setTimestampMs(clock.nowMillis()) + .setUser(user)); + + lockStore.saveLock(lock); + return lock; }); } @Override public void releaseLock(final ILock lock) { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - storeProvider.getLockStore().removeLock(lock.getKey()); - } - }); + storage.write( + (NoResult.Quiet) storeProvider -> storeProvider.getLockStore().removeLock(lock.getKey())); } @Override public void validateIfLocked(final ILockKey context, Optional<ILock> heldLock) throws LockException { - Optional<ILock> stored = storage.read(new Work.Quiet<Optional<ILock>>() { - @Override - public Optional<ILock> apply(StoreProvider storeProvider) { - return storeProvider.getLockStore().fetchLock(context); - } - }); + Optional<ILock> stored = storage.read( + storeProvider -> storeProvider.getLockStore().fetchLock(context)); // The implementation below assumes the following use cases: // +-----------+-----------------+----------+ @@ -125,12 +110,7 @@ public class LockManagerImpl implements LockManager { @Override public Iterable<ILock> getLocks() { - return storage.read(new Work.Quiet<Iterable<ILock>>() { - @Override - public Iterable<ILock> apply(StoreProvider storeProvider) { - return storeProvider.getLockStore().fetchLocks(); - } - }); + return storage.read(storeProvider -> storeProvider.getLockStore().fetchLocks()); } private static String formatLockKey(ILockKey lockKey) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java index 617ee54..60ebfdf 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java +++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java @@ -38,9 +38,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -153,24 +151,21 @@ public interface MaintenanceController { public void taskChangedState(final TaskStateChange change) { if (Tasks.isTerminated(change.getNewState())) { final String host = change.getTask().getAssignedTask().getSlaveHost(); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider store) { - // If the task _was_ associated with a draining host, and it was the last task on the - // host. - Optional<IHostAttributes> attributes = - store.getAttributeStore().getHostAttributes(host); - if (attributes.isPresent() && attributes.get().getMode() == DRAINING) { - Query.Builder builder = Query.slaveScoped(host).active(); - Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder); - if (Iterables.isEmpty(activeTasks)) { - LOG.info(String.format("Moving host %s into DRAINED", host)); - setMaintenanceMode(store, ImmutableSet.of(host), DRAINED); - } else { - LOG.info(String.format("Host %s is DRAINING with active tasks: %s", - host, - Tasks.ids(activeTasks))); - } + storage.write((NoResult.Quiet) (MutableStoreProvider store) -> { + // If the task _was_ associated with a draining host, and it was the last task on the + // host. + Optional<IHostAttributes> attributes = + store.getAttributeStore().getHostAttributes(host); + if (attributes.isPresent() && attributes.get().getMode() == DRAINING) { + Query.Builder builder = Query.slaveScoped(host).active(); + Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder); + if (Iterables.isEmpty(activeTasks)) { + LOG.info(String.format("Moving host %s into DRAINED", host)); + setMaintenanceMode(store, ImmutableSet.of(host), DRAINED); + } else { + LOG.info(String.format("Host %s is DRAINING with active tasks: %s", + host, + Tasks.ids(activeTasks))); } } }); @@ -179,12 +174,8 @@ public interface MaintenanceController { @Override public Set<HostStatus> startMaintenance(final Set<String> hosts) { - return storage.write(new MutateWork.Quiet<Set<HostStatus>>() { - @Override - public Set<HostStatus> apply(MutableStoreProvider storeProvider) { - return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED); - } - }); + return storage.write( + storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED)); } @VisibleForTesting @@ -193,73 +184,41 @@ public interface MaintenanceController { @Override public Set<HostStatus> drain(final Set<String> hosts) { - return storage.write(new MutateWork.Quiet<Set<HostStatus>>() { - @Override - public Set<HostStatus> apply(MutableStoreProvider store) { - return watchDrainingTasks(store, hosts); - } - }); + return storage.write(store -> watchDrainingTasks(store, hosts)); } private static final Function<IHostAttributes, String> HOST_NAME = - new Function<IHostAttributes, String>() { - @Override - public String apply(IHostAttributes attributes) { - return attributes.getHost(); - } - }; + IHostAttributes::getHost; private static final Function<IHostAttributes, HostStatus> ATTRS_TO_STATUS = - new Function<IHostAttributes, HostStatus>() { - @Override - public HostStatus apply(IHostAttributes attributes) { - return new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode()); - } - }; + attributes -> new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode()); private static final Function<HostStatus, MaintenanceMode> GET_MODE = - new Function<HostStatus, MaintenanceMode>() { - @Override - public MaintenanceMode apply(HostStatus status) { - return status.getMode(); - } - }; + HostStatus::getMode; @Override public MaintenanceMode getMode(final String host) { - return storage.read(new Work.Quiet<MaintenanceMode>() { - @Override - public MaintenanceMode apply(StoreProvider storeProvider) { - return storeProvider.getAttributeStore().getHostAttributes(host) - .transform(ATTRS_TO_STATUS) - .transform(GET_MODE) - .or(MaintenanceMode.NONE); - } - }); + return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host) + .transform(ATTRS_TO_STATUS) + .transform(GET_MODE) + .or(MaintenanceMode.NONE)); } @Override public Set<HostStatus> getStatus(final Set<String> hosts) { - return storage.read(new Work.Quiet<Set<HostStatus>>() { - @Override - public Set<HostStatus> apply(StoreProvider storeProvider) { - // Warning - this is filtering _all_ host attributes. If using this to frequently query - // for a small set of hosts, a getHostAttributes variant should be added. - return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes()) - .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME)) - .transform(ATTRS_TO_STATUS).toSet(); - } + return storage.read(storeProvider -> { + // Warning - this is filtering _all_ host attributes. If using this to frequently query + // for a small set of hosts, a getHostAttributes variant should be added. + return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes()) + .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME)) + .transform(ATTRS_TO_STATUS).toSet(); }); } @Override public Set<HostStatus> endMaintenance(final Set<String> hosts) { - return storage.write(new MutateWork.Quiet<Set<HostStatus>>() { - @Override - public Set<HostStatus> apply(MutableStoreProvider storeProvider) { - return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE); - } - }); + return storage.write( + storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE)); } private Set<HostStatus> setMaintenanceMode( http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index f27c93b..6503af2 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -54,7 +54,6 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.state.SideEffect.Action; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -63,7 +62,6 @@ import org.apache.mesos.Protos.SlaveID; import static java.util.Objects.requireNonNull; import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank; - import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; import static org.apache.aurora.gen.ScheduleStatus.INIT; import static org.apache.aurora.gen.ScheduleStatus.PENDING; @@ -121,12 +119,7 @@ public class StateManagerImpl implements StateManager { // Done outside the write transaction to minimize the work done inside a transaction. Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds) - .transform(new Function<Integer, IScheduledTask>() { - @Override - public IScheduledTask apply(Integer instanceId) { - return createTask(instanceId, task); - } - }).toSet(); + .transform(instanceId -> createTask(instanceId, task)).toSet(); Iterable<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks( Query.jobScoped(task.getJob()).active()); @@ -182,16 +175,13 @@ public class StateManagerImpl implements StateManager { Query.Builder query = Query.taskScoped(taskId); storeProvider.getUnsafeTaskStore().mutateTasks(query, - new Function<IScheduledTask, IScheduledTask>() { - @Override - public IScheduledTask apply(IScheduledTask task) { - ScheduledTask builder = task.newBuilder(); - builder.getAssignedTask() - .setAssignedPorts(assignedPorts) - .setSlaveHost(slaveHost) - .setSlaveId(slaveId.getValue()); - return IScheduledTask.build(builder); - } + task -> { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask() + .setAssignedPorts(assignedPorts) + .setSlaveHost(slaveHost) + .setSlaveId(slaveId.getValue()); + return IScheduledTask.build(builder); }); StateChangeResult changeResult = updateTaskAndExternalState( @@ -213,15 +203,12 @@ public class StateManagerImpl implements StateManager { @VisibleForTesting static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize( - new Supplier<String>() { - @Override - public String get() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - LOG.log(Level.SEVERE, "Failed to get self hostname."); - throw Throwables.propagate(e); - } + () -> { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.log(Level.SEVERE, "Failed to get self hostname."); + throw Throwables.propagate(e); } }); @@ -252,12 +239,7 @@ public class StateManagerImpl implements StateManager { } private static final Function<SideEffect, Action> GET_ACTION = - new Function<SideEffect, Action>() { - @Override - public Action apply(SideEffect sideEffect) { - return sideEffect.getAction(); - } - }; + SideEffect::getAction; private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of( Action.INCREMENT_FAILURES, @@ -309,13 +291,8 @@ public class StateManagerImpl implements StateManager { switch (sideEffect.getAction()) { case INCREMENT_FAILURES: - taskStore.mutateTasks(query, new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - return IScheduledTask.build( - task.newBuilder().setFailureCount(task.getFailureCount() + 1)); - } - }); + taskStore.mutateTasks(query, task1 -> IScheduledTask.build( + task1.newBuilder().setFailureCount(task1.getFailureCount() + 1))); break; case SAVE_STATE: @@ -323,18 +300,15 @@ public class StateManagerImpl implements StateManager { upToDateTask.isPresent(), "Operation expected task " + taskId + " to be present."); - taskStore.mutateTasks(query, new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - ScheduledTask mutableTask = task.newBuilder(); - mutableTask.setStatus(targetState.get()); - mutableTask.addToTaskEvents(new TaskEvent() - .setTimestamp(clock.nowMillis()) - .setStatus(targetState.get()) - .setMessage(transitionMessage.orNull()) - .setScheduler(LOCAL_HOST_SUPPLIER.get())); - return IScheduledTask.build(mutableTask); - } + taskStore.mutateTasks(query, task1 -> { + ScheduledTask mutableTask = task1.newBuilder(); + mutableTask.setStatus(targetState.get()); + mutableTask.addToTaskEvents(new TaskEvent() + .setTimestamp(clock.nowMillis()) + .setStatus(targetState.get()) + .setMessage(transitionMessage.orNull()) + .setScheduler(LOCAL_HOST_SUPPLIER.get())); + return IScheduledTask.build(mutableTask); }); events.add( PubsubEvent.TaskStateChange.transition( http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index de7ebb3..50868ea 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -23,14 +23,12 @@ import java.util.logging.Logger; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.Stats; - import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.TierInfo; @@ -117,12 +115,7 @@ public interface TaskAssigner { final Iterator<String> names = requestedPorts.iterator(); Map<String, Integer> portsByName = FluentIterable.from(selectedPorts) - .uniqueIndex(new Function<Object, String>() { - @Override - public String apply(Object input) { - return names.next(); - } - }); + .uniqueIndex(input -> names.next()); IAssignedTask assigned = stateManager.assignTask( storeProvider, http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java index 9ace5b0..b8d8bf9 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java @@ -91,12 +91,7 @@ class TaskStateMachine { private final Set<SideEffect> sideEffects = Sets.newHashSet(); private static final Function<ScheduleStatus, TaskState> STATUS_TO_TASK_STATE = - new Function<ScheduleStatus, TaskState>() { - @Override - public TaskState apply(ScheduleStatus input) { - return TaskState.valueOf(input.name()); - } - }; + input -> TaskState.valueOf(input.name()); private static final Function<IScheduledTask, TaskState> SCHEDULED_TO_TASK_STATE = Functions.compose(STATUS_TO_TASK_STATE, IScheduledTask::getStatus); @@ -183,52 +178,46 @@ class TaskStateMachine { .build()); final Closure<Transition<TaskState>> manageRestartingTask = - new Closure<Transition<TaskState>>() { - @Override - public void execute(Transition<TaskState> transition) { - switch (transition.getTo()) { - case ASSIGNED: - addFollowup(KILL); - break; - - case STARTING: - addFollowup(KILL); - break; - - case RUNNING: - addFollowup(KILL); - break; - - case LOST: - addFollowup(KILL); - addFollowup(RESCHEDULE); - break; - - case FINISHED: - addFollowup(RESCHEDULE); - break; - - case FAILED: - addFollowup(RESCHEDULE); - break; - - case KILLED: - addFollowup(RESCHEDULE); - break; - - default: - // No-op. - } + transition -> { + switch (transition.getTo()) { + case ASSIGNED: + addFollowup(KILL); + break; + + case STARTING: + addFollowup(KILL); + break; + + case RUNNING: + addFollowup(KILL); + break; + + case LOST: + addFollowup(KILL); + addFollowup(RESCHEDULE); + break; + + case FINISHED: + addFollowup(RESCHEDULE); + break; + + case FAILED: + addFollowup(RESCHEDULE); + break; + + case KILLED: + addFollowup(RESCHEDULE); + break; + + default: + // No-op. } }; // To be called on a task transitioning into the FINISHED state. - final Command rescheduleIfService = new Command() { - @Override - public void execute() { - if (task.get().getAssignedTask().getTask().isIsService()) { - addFollowup(RESCHEDULE); - } + final Command rescheduleIfService = () -> { + if (task.get().getAssignedTask().getTask().isIsService()) { + addFollowup(RESCHEDULE); } }; @@ -275,46 +264,43 @@ class TaskStateMachine { .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLED, KILLING, LOST, PREEMPTING) .withCallback( - new Closure<Transition<TaskState>>() { - @Override - public void execute(Transition<TaskState> transition) { - switch (transition.getTo()) { - case FINISHED: - rescheduleIfService.execute(); - break; - - case PREEMPTING: - addFollowup(KILL); - break; - - case FAILED: - incrementFailuresMaybeReschedule.execute(); - break; - - case RESTARTING: - addFollowup(KILL); - break; - - case DRAINING: - addFollowup(KILL); - break; - - case KILLED: - addFollowup(RESCHEDULE); - break; - - case LOST: - addFollowup(RESCHEDULE); - addFollowup(KILL); - break; - - case KILLING: - addFollowup(KILL); - break; - - default: - // No-op. - } + transition -> { + switch (transition.getTo()) { + case FINISHED: + rescheduleIfService.execute(); + break; + + case PREEMPTING: + addFollowup(KILL); + break; + + case FAILED: + incrementFailuresMaybeReschedule.execute(); + break; + + case RESTARTING: + addFollowup(KILL); + break; + + case DRAINING: + addFollowup(KILL); + break; + + case KILLED: + addFollowup(RESCHEDULE); + break; + + case LOST: + addFollowup(RESCHEDULE); + addFollowup(KILL); + break; + + case KILLING: + addFollowup(KILL); + break; + + default: + // No-op. } } )) @@ -323,45 +309,42 @@ class TaskStateMachine { .to(RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLING, KILLED, LOST, PREEMPTING) .withCallback( - new Closure<Transition<TaskState>>() { - @Override - public void execute(Transition<TaskState> transition) { - switch (transition.getTo()) { - case FINISHED: - rescheduleIfService.execute(); - break; - - case RESTARTING: - addFollowup(KILL); - break; - - case DRAINING: - addFollowup(KILL); - break; - - case PREEMPTING: - addFollowup(KILL); - break; - - case FAILED: - incrementFailuresMaybeReschedule.execute(); - break; - - case KILLED: - addFollowup(RESCHEDULE); - break; - - case KILLING: - addFollowup(KILL); - break; - - case LOST: - addFollowup(RESCHEDULE); - break; - - default: - // No-op. - } + transition -> { + switch (transition.getTo()) { + case FINISHED: + rescheduleIfService.execute(); + break; + + case RESTARTING: + addFollowup(KILL); + break; + + case DRAINING: + addFollowup(KILL); + break; + + case PREEMPTING: + addFollowup(KILL); + break; + + case FAILED: + incrementFailuresMaybeReschedule.execute(); + break; + + case KILLED: + addFollowup(RESCHEDULE); + break; + + case KILLING: + addFollowup(KILL); + break; + + case LOST: + addFollowup(RESCHEDULE); + break; + + default: + // No-op. } } )) @@ -369,45 +352,42 @@ class TaskStateMachine { Rule.from(RUNNING) .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, LOST, PREEMPTING) .withCallback( - new Closure<Transition<TaskState>>() { - @Override - public void execute(Transition<TaskState> transition) { - switch (transition.getTo()) { - case FINISHED: - rescheduleIfService.execute(); - break; - - case PREEMPTING: - addFollowup(KILL); - break; - - case RESTARTING: - addFollowup(KILL); - break; - - case DRAINING: - addFollowup(KILL); - break; - - case FAILED: - incrementFailuresMaybeReschedule.execute(); - break; - - case KILLED: - addFollowup(RESCHEDULE); - break; - - case KILLING: - addFollowup(KILL); - break; - - case LOST: - addFollowup(RESCHEDULE); - break; - - default: - // No-op. - } + transition -> { + switch (transition.getTo()) { + case FINISHED: + rescheduleIfService.execute(); + break; + + case PREEMPTING: + addFollowup(KILL); + break; + + case RESTARTING: + addFollowup(KILL); + break; + + case DRAINING: + addFollowup(KILL); + break; + + case FAILED: + incrementFailuresMaybeReschedule.execute(); + break; + + case KILLED: + addFollowup(RESCHEDULE); + break; + + case KILLING: + addFollowup(KILL); + break; + + case LOST: + addFollowup(RESCHEDULE); + break; + + default: + // No-op. } } )) @@ -495,12 +475,7 @@ class TaskStateMachine { } private Closure<Transition<TaskState>> addFollowupClosure(final Action action) { - return new Closure<Transition<TaskState>>() { - @Override - public void execute(Transition<TaskState> item) { - addFollowup(action); - } - }; + return item -> addFollowup(action); } /** @@ -549,12 +524,7 @@ class TaskStateMachine { */ @Nullable ScheduleStatus getPreviousState() { - return previousState.transform(new Function<TaskState, ScheduleStatus>() { - @Override - public ScheduleStatus apply(TaskState item) { - return item.getStatus().orNull(); - } - }).orNull(); + return previousState.transform(item -> item.getStatus().orNull()).orNull(); } @Override
