http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java index c8ef858..5231e9f 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java @@ -34,8 +34,6 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.StorageException; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -61,12 +59,7 @@ public class ResourceCounter { } private static final Function<MetricType, GlobalMetric> TO_GLOBAL_METRIC = - new Function<MetricType, GlobalMetric>() { - @Override - public GlobalMetric apply(MetricType type) { - return new GlobalMetric(type); - } - }; + GlobalMetric::new; /** * Computes totals for each of the {@link MetricType}s. @@ -94,15 +87,12 @@ public class ResourceCounter { * @throws StorageException if there was a problem fetching quotas from storage. */ public Metric computeQuotaAllocationTotals() throws StorageException { - return storage.read(new Work.Quiet<Metric>() { - @Override - public Metric apply(StoreProvider storeProvider) { - Metric allocation = new Metric(); - for (IResourceAggregate quota : storeProvider.getQuotaStore().fetchQuotas().values()) { - allocation.accumulate(quota); - } - return allocation; + return storage.read(storeProvider -> { + Metric allocation = new Metric(); + for (IResourceAggregate quota : storeProvider.getQuotaStore().fetchQuotas().values()) { + allocation.accumulate(quota); } + return allocation; }); }
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java index ced6b67..c9e57ec 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java @@ -126,12 +126,8 @@ class SlotSizeCounter implements Runnable { } private int countSlots(Iterable<IResourceAggregate> slots, final IResourceAggregate slotSize) { - Function<IResourceAggregate, Integer> counter = new Function<IResourceAggregate, Integer>() { - @Override - public Integer apply(IResourceAggregate machineSlack) { - return ResourceAggregates.divide(machineSlack, slotSize); - } - }; + Function<IResourceAggregate, Integer> counter = + machineSlack -> ResourceAggregates.divide(machineSlack, slotSize); int sum = 0; for (int slotCount : FluentIterable.from(slots).transform(counter)) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java index 6399e78..4767ef1 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java @@ -61,12 +61,7 @@ public class StatsModule extends AbstractModule { bind(TimeSeriesRepositoryImpl.class).in(Singleton.class); bind(new TypeLiteral<Supplier<Iterable<Stat<?>>>>() { }).toInstance( - new Supplier<Iterable<Stat<?>>>() { - @Override - public Iterable<Stat<?>> get() { - return Stats.getVariables(); - } - } + Stats::getVariables ); SchedulerServicesModule.addAppStartupServiceBinding(binder()) http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java index 2136154..6a5069f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java @@ -30,6 +30,7 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -91,14 +92,11 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage { checkInState(State.PREPARED); wrapped.start(initializationLogic); stateMachine.transition(State.READY); - wrapped.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy( - storeProvider.getTaskStore().fetchTasks(Query.unscoped())); - for (IScheduledTask task : tasks) { - eventSink.post(TaskStateChange.initialized(task)); - } + wrapped.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy( + storeProvider.getTaskStore().fetchTasks(Query.unscoped())); + for (IScheduledTask task : tasks) { + eventSink.post(TaskStateChange.initialized(task)); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index f699ba5..6109158 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -292,21 +292,11 @@ public interface Storage { * @return Tasks returned from the query. */ public static Iterable<IScheduledTask> fetchTasks(Storage storage, final Builder query) { - return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() { - @Override - public Iterable<IScheduledTask> apply(StoreProvider storeProvider) { - return storeProvider.getTaskStore().fetchTasks(query); - } - }); + return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query)); } public static Iterable<IJobConfiguration> fetchCronJobs(Storage storage) { - return storage.read(new Work.Quiet<Iterable<IJobConfiguration>>() { - @Override - public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) { - return storeProvider.getCronJobStore().fetchJobs(); - } - }); + return storage.read(storeProvider -> storeProvider.getCronJobStore().fetchJobs()); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java index a0483f4..5401a28 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java @@ -24,7 +24,6 @@ import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -75,13 +74,10 @@ public final class StorageBackfill { // Backfilling job keys has to be done in a separate transaction to ensure follow up scoped // Query calls work against upgraded MemTaskStore, which does not support deprecated fields. LOG.info("Backfilling task config job keys."); - storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new TaskMutation() { - @Override - public IScheduledTask apply(final IScheduledTask task) { - ScheduledTask builder = task.newBuilder(); - populateJobKey(builder.getAssignedTask().getTask(), BACKFILLED_TASK_CONFIG_KEYS); - return IScheduledTask.build(builder); - } + storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), task -> { + ScheduledTask builder = task.newBuilder(); + populateJobKey(builder.getAssignedTask().getTask(), BACKFILLED_TASK_CONFIG_KEYS); + return IScheduledTask.build(builder); }); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java index 142e4ac..62639c4 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java @@ -118,47 +118,44 @@ public interface TaskStore { } public static Predicate<IScheduledTask> queryFilter(final Query.Builder queryBuilder) { - return new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - TaskQuery query = queryBuilder.get(); - ITaskConfig config = task.getAssignedTask().getTask(); - // TODO(wfarner): Investigate why blank inputs are treated specially for the role field. - if (query.getRole() != null - && !WHITESPACE.matchesAllOf(query.getRole()) - && !query.getRole().equals(config.getJob().getRole())) { - return false; - } - if (query.getEnvironment() != null - && !query.getEnvironment().equals(config.getEnvironment())) { - return false; - } - if (query.getJobName() != null && !query.getJobName().equals(config.getJobName())) { - return false; - } - - if (query.getJobKeysSize() > 0 - && !query.getJobKeys().contains(config.getJob().newBuilder())) { - return false; - } - if (query.getTaskIds() != null && !query.getTaskIds().contains(Tasks.id(task))) { - return false; - } - - if (query.getStatusesSize() > 0 && !query.getStatuses().contains(task.getStatus())) { - return false; - } - if (query.getSlaveHostsSize() > 0 - && !query.getSlaveHosts().contains(task.getAssignedTask().getSlaveHost())) { - return false; - } - if (query.getInstanceIdsSize() > 0 - && !query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) { - return false; - } - - return true; + return task -> { + TaskQuery query = queryBuilder.get(); + ITaskConfig config = task.getAssignedTask().getTask(); + // TODO(wfarner): Investigate why blank inputs are treated specially for the role field. + if (query.getRole() != null + && !WHITESPACE.matchesAllOf(query.getRole()) + && !query.getRole().equals(config.getJob().getRole())) { + return false; } + if (query.getEnvironment() != null + && !query.getEnvironment().equals(config.getEnvironment())) { + return false; + } + if (query.getJobName() != null && !query.getJobName().equals(config.getJobName())) { + return false; + } + + if (query.getJobKeysSize() > 0 + && !query.getJobKeys().contains(config.getJob().newBuilder())) { + return false; + } + if (query.getTaskIds() != null && !query.getTaskIds().contains(Tasks.id(task))) { + return false; + } + + if (query.getStatusesSize() > 0 && !query.getStatuses().contains(task.getStatus())) { + return false; + } + if (query.getSlaveHostsSize() > 0 + && !query.getSlaveHosts().contains(task.getAssignedTask().getSlaveHost())) { + return false; + } + if (query.getInstanceIdsSize() > 0 + && !query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) { + return false; + } + + return true; }; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java index 18daa05..0f0218c 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java @@ -34,7 +34,7 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import static java.util.Objects.requireNonNull; @@ -190,15 +190,12 @@ public interface Recovery { } void commit() { - primaryStorage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - try { - distributedStore.persist(tempStorage.toSnapshot()); - shutDownNow.execute(); - } catch (CodingException e) { - throw new IllegalStateException("Failed to encode snapshot.", e); - } + primaryStorage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + try { + distributedStore.persist(tempStorage.toSnapshot()); + shutDownNow.execute(); + } catch (CodingException e) { + throw new IllegalStateException("Failed to encode snapshot.", e); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java index c4e18d4..2cd8793 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java @@ -138,12 +138,7 @@ public interface StorageBackup { public Snapshot createSnapshot() { final Snapshot snapshot = delegate.createSnapshot(); if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) { - executor.execute(new Runnable() { - @Override - public void run() { - save(snapshot); - } - }); + executor.execute(() -> save(snapshot)); } return snapshot; } @@ -209,20 +204,10 @@ public interface StorageBackup { } } - private static final FilenameFilter BACKUP_FILTER = new FilenameFilter() { - @Override - public boolean accept(File file, String s) { - return s.startsWith(FILE_PREFIX); - } - }; + private static final FilenameFilter BACKUP_FILTER = (file, s) -> s.startsWith(FILE_PREFIX); @VisibleForTesting - static final Function<File, String> FILE_NAME = new Function<File, String>() { - @Override - public String apply(File file) { - return file.getName(); - } - }; + static final Function<File, String> FILE_NAME = File::getName; @Override public void applySnapshot(Snapshot snapshot) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index 6af059d..f683f79 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -26,9 +26,7 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; @@ -82,25 +80,17 @@ interface TemporaryStorage { return new TemporaryStorage() { @Override public void deleteTasks(final Query.Builder query) { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query)) - .transform(Tasks::id) - .toSet(); - storeProvider.getUnsafeTaskStore().deleteTasks(ids); - } + storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query)) + .transform(Tasks::id) + .toSet(); + storeProvider.getUnsafeTaskStore().deleteTasks(ids); }); } @Override public Iterable<IScheduledTask> fetchTasks(final Query.Builder query) { - return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() { - @Override - public Iterable<IScheduledTask> apply(StoreProvider storeProvider) { - return storeProvider.getTaskStore().fetchTasks(query); - } - }); + return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query)); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java index ee3ff6c..6901098 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java @@ -79,12 +79,8 @@ class DbAttributeStore implements AttributeStore.Mutable { return true; } - private static final Predicate<IAttribute> EMPTY_VALUES = new Predicate<IAttribute>() { - @Override - public boolean apply(IAttribute attribute) { - return attribute.getValues().isEmpty(); - } - }; + private static final Predicate<IAttribute> EMPTY_VALUES = + attribute -> attribute.getValues().isEmpty(); @Timed("attribute_store_fetch_one") @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java index 7652132..d2673e6 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java @@ -158,12 +158,7 @@ public class DbJobUpdateStore implements JobUpdateStore.Mutable { } private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY = - new Function<PruneVictim, IJobUpdateKey>() { - @Override - public IJobUpdateKey apply(PruneVictim victim) { - return IJobUpdateKey.build(victim.getUpdate()); - } - }; + victim -> IJobUpdateKey.build(victim.getUpdate()); @Timed("job_update_store_prune_history") @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index dd7e1d3..7674b8a 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -155,14 +155,11 @@ class DbStorage extends AbstractIdleService implements Storage { // introduction of DbStorage, but should be revisited. // TODO(wfarner): Consider revisiting to execute async work only when the transaction is // successful. - return gatedWorkQueue.closeDuring(new GatedOperation<T, E>() { - @Override - public T doWithGateClosed() throws E { - try { - return transactionedWrite(work); - } catch (PersistenceException e) { - throw new StorageException(e.getMessage(), e); - } + return gatedWorkQueue.closeDuring((GatedOperation<T, E>) () -> { + try { + return transactionedWrite(work); + } catch (PersistenceException e) { + throw new StorageException(e.getMessage(), e); } }); } @@ -179,23 +176,20 @@ class DbStorage extends AbstractIdleService implements Storage { public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E { - gatedWorkQueue.closeDuring(new GatedOperation<Void, E>() { - @Override - public Void doWithGateClosed() throws E { - // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk - // insert. - try (SqlSession session = sessionFactory.openSession(false)) { - try { - session.update(DISABLE_UNDO_LOG); - work.apply(storeProvider); - } catch (PersistenceException e) { - throw new StorageException(e.getMessage(), e); - } finally { - session.update(ENABLE_UNDO_LOG); - } + gatedWorkQueue.closeDuring(() -> { + // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk + // insert. + try (SqlSession session = sessionFactory.openSession(false)) { + try { + session.update(DISABLE_UNDO_LOG); + work.apply(storeProvider); + } catch (PersistenceException e) { + throw new StorageException(e.getMessage(), e); + } finally { + session.update(ENABLE_UNDO_LOG); } - return null; } + return null; }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java b/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java index df6e583..2684054 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AbstractScheduledService; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.ibatis.exceptions.PersistenceException; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; @@ -72,18 +73,15 @@ class RowGarbageCollector extends AbstractScheduledService { final AtomicLong deletedCount = new AtomicLong(); for (Class<? extends GarbageCollectedTableMapper> tableClass : TABLES) { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) { - try (SqlSession session = sessionFactory.openSession(true)) { - GarbageCollectedTableMapper table = session.getMapper(tableClass); - for (long rowId : table.selectAllRowIds()) { - try { - table.deleteRow(rowId); - deletedCount.incrementAndGet(); - } catch (PersistenceException e) { - // Expected for rows that are still referenced. - } + storage.write((NoResult.Quiet) (Storage.MutableStoreProvider storeProvider) -> { + try (SqlSession session = sessionFactory.openSession(true)) { + GarbageCollectedTableMapper table = session.getMapper(tableClass); + for (long rowId : table.selectAllRowIds()) { + try { + table.deleteRow(rowId); + deletedCount.incrementAndGet(); + } catch (PersistenceException e) { + // Expected for rows that are still referenced. } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java index 9b6add9..dea2bda 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java @@ -80,12 +80,7 @@ public interface EntrySerializer { final byte[] header = encode( Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry))))); - return new Iterable<byte[]>() { - @Override - public Iterator<byte[]> iterator() { - return streamFrames(header, chunks, entry); - } - }; + return () -> streamFrames(header, chunks, entry); } Iterator<byte[]> streamFrames(final byte[] header, final int chunks, final byte[] entry) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java index adbf459..8aaff22 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java @@ -32,7 +32,6 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.application.ShutdownRegistry; import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Command; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; @@ -61,6 +60,7 @@ import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.TaskStore; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -158,16 +158,10 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry, Amount<Long, Time> shutdownGracePeriod) { scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG); - shutdownRegistry.addAction(new Command() { - - @Override - public void execute() throws RuntimeException { - MoreExecutors.shutdownAndAwaitTermination( - scheduledExecutor, - shutdownGracePeriod.getValue(), - shutdownGracePeriod.getUnit().getTimeUnit()); - } - }); + shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination( + scheduledExecutor, + shutdownGracePeriod.getValue(), + shutdownGracePeriod.getUnit().getTimeUnit())); } @Override @@ -311,32 +305,21 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @VisibleForTesting final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() { return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder() - .put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() { - @Override - public void execute(LogEntry logEntry) { - Snapshot snapshot = logEntry.getSnapshot(); - LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); - snapshotStore.applySnapshot(snapshot); - } + .put(LogEntry._Fields.SNAPSHOT, logEntry -> { + Snapshot snapshot = logEntry.getSnapshot(); + LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); + snapshotStore.applySnapshot(snapshot); }) - .put(LogEntry._Fields.TRANSACTION, new Closure<LogEntry>() { + .put(LogEntry._Fields.TRANSACTION, logEntry -> write(new MutateWork.NoResult.Quiet() { @Override - public void execute(final LogEntry logEntry) { - write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider unused) { - for (Op op : logEntry.getTransaction().getOps()) { - replayOp(op); - } - } - }); - } - }) - .put(LogEntry._Fields.NOOP, new Closure<LogEntry>() { - @Override - public void execute(LogEntry item) { - // Nothing to do here + public void execute(MutableStoreProvider unused) { + for (Op op : logEntry.getTransaction().getOps()) { + replayOp(op); + } } + })) + .put(LogEntry._Fields.NOOP, item -> { + // Nothing to do here }) .build(); } @@ -344,124 +327,77 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @VisibleForTesting final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() { return ImmutableMap.<Op._Fields, Closure<Op>>builder() - .put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()); - } + .put( + Op._Fields.SAVE_FRAMEWORK_ID, + op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId())) + .put(Op._Fields.SAVE_CRON_JOB, op -> { + SaveCronJob cronJob = op.getSaveCronJob(); + writeBehindJobStore.saveAcceptedJob( + IJobConfiguration.build(cronJob.getJobConfig())); }) - .put(Op._Fields.SAVE_CRON_JOB, new Closure<Op>() { - @Override - public void execute(Op op) { - SaveCronJob cronJob = op.getSaveCronJob(); - writeBehindJobStore.saveAcceptedJob( - IJobConfiguration.build(cronJob.getJobConfig())); - } + .put( + Op._Fields.REMOVE_JOB, + op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()))) + .put( + Op._Fields.SAVE_TASKS, + op -> writeBehindTaskStore.saveTasks( + IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()))) + .put(Op._Fields.REWRITE_TASK, op -> { + RewriteTask rewriteTask = op.getRewriteTask(); + writeBehindTaskStore.unsafeModifyInPlace( + rewriteTask.getTaskId(), + ITaskConfig.build(rewriteTask.getTask())); }) - .put(Op._Fields.REMOVE_JOB, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())); - } + .put( + Op._Fields.REMOVE_TASKS, + op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds())) + .put(Op._Fields.SAVE_QUOTA, op -> { + SaveQuota saveQuota = op.getSaveQuota(); + writeBehindQuotaStore.saveQuota( + saveQuota.getRole(), + IResourceAggregate.build(saveQuota.getQuota())); }) - .put(Op._Fields.SAVE_TASKS, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindTaskStore.saveTasks( - IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks())); + .put( + Op._Fields.REMOVE_QUOTA, + op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole())) + .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> { + HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes(); + // Prior to commit 5cf760b, the store would persist maintenance mode changes for + // unknown hosts. 5cf760b began rejecting these, but the replicated log may still + // contain entries with a null slave ID. + if (attributes.isSetSlaveId()) { + writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes)); + } else { + LOG.info("Dropping host attributes with no slave ID: " + attributes); } }) - .put(Op._Fields.REWRITE_TASK, new Closure<Op>() { - @Override - public void execute(Op op) { - RewriteTask rewriteTask = op.getRewriteTask(); - writeBehindTaskStore.unsafeModifyInPlace( - rewriteTask.getTaskId(), - ITaskConfig.build(rewriteTask.getTask())); - } - }) - .put(Op._Fields.REMOVE_TASKS, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()); - } - }) - .put(Op._Fields.SAVE_QUOTA, new Closure<Op>() { - @Override - public void execute(Op op) { - SaveQuota saveQuota = op.getSaveQuota(); - writeBehindQuotaStore.saveQuota( - saveQuota.getRole(), - IResourceAggregate.build(saveQuota.getQuota())); - } + .put( + Op._Fields.SAVE_LOCK, + op -> writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()))) + .put( + Op._Fields.REMOVE_LOCK, + op -> writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()))) + .put(Op._Fields.SAVE_JOB_UPDATE, op -> { + JobUpdate update = op.getSaveJobUpdate().getJobUpdate(); + writeBehindJobUpdateStore.saveJobUpdate( + IJobUpdate.build(update), + Optional.fromNullable(op.getSaveJobUpdate().getLockToken())); }) - .put(Op._Fields.REMOVE_QUOTA, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()); - } + .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> { + SaveJobUpdateEvent event = op.getSaveJobUpdateEvent(); + writeBehindJobUpdateStore.saveJobUpdateEvent( + IJobUpdateKey.build(event.getKey()), + IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent())); }) - .put(Op._Fields.SAVE_HOST_ATTRIBUTES, new Closure<Op>() { - @Override - public void execute(Op op) { - HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes(); - // Prior to commit 5cf760b, the store would persist maintenance mode changes for - // unknown hosts. 5cf760b began rejecting these, but the replicated log may still - // contain entries with a null slave ID. - if (attributes.isSetSlaveId()) { - writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes)); - } else { - LOG.info("Dropping host attributes with no slave ID: " + attributes); - } - } + .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> { + SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent(); + writeBehindJobUpdateStore.saveJobInstanceUpdateEvent( + IJobUpdateKey.build(event.getKey()), + IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent())); }) - .put(Op._Fields.SAVE_LOCK, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock())); - } - }) - .put(Op._Fields.REMOVE_LOCK, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey())); - } - }) - .put(Op._Fields.SAVE_JOB_UPDATE, new Closure<Op>() { - @Override - public void execute(Op op) { - JobUpdate update = op.getSaveJobUpdate().getJobUpdate(); - writeBehindJobUpdateStore.saveJobUpdate( - IJobUpdate.build(update), - Optional.fromNullable(op.getSaveJobUpdate().getLockToken())); - } - }) - .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, new Closure<Op>() { - @Override - public void execute(Op op) { - SaveJobUpdateEvent event = op.getSaveJobUpdateEvent(); - writeBehindJobUpdateStore.saveJobUpdateEvent( - IJobUpdateKey.build(event.getKey()), - IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent())); - } - }) - .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, new Closure<Op>() { - @Override - public void execute(Op op) { - SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent(); - writeBehindJobUpdateStore.saveJobInstanceUpdateEvent( - IJobUpdateKey.build(event.getKey()), - IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent())); - } - }) - .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, new Closure<Op>() { - @Override - public void execute(Op op) { - writeBehindJobUpdateStore.pruneHistory( - op.getPruneJobUpdateHistory().getPerJobRetainCount(), - op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs()); - } - }).build(); + .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> writeBehindJobUpdateStore.pruneHistory( + op.getPruneJobUpdateHistory().getPerJobRetainCount(), + op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs())).build(); } @Override @@ -477,19 +413,16 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @Override public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) { - write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider unused) { - // Must have the underlying storage started so we can query it for the last checkpoint. - // We replay these entries in the forwarded storage system's transactions but not ours - we - // do not want to re-record these ops to the log. - recover(); - recovered = true; - - // Now that we're recovered we should let any mutations done in initializationLogic append - // to the log, so run it in one of our transactions. - write(initializationLogic); - } + write((NoResult.Quiet) (MutableStoreProvider unused) -> { + // Must have the underlying storage started so we can query it for the last checkpoint. + // We replay these entries in the forwarded storage system's transactions but not ours - we + // do not want to re-record these ops to the log. + recover(); + recovered = true; + + // Now that we're recovered we should let any mutations done in initializationLogic append + // to the log, so run it in one of our transactions. + write(initializationLogic); }); scheduleSnapshots(); @@ -502,19 +435,11 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @Timed("scheduler_log_recover") void recover() throws RecoveryFailedException { - writeBehindStorage.bulkLoad(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - try { - streamManager.readFromBeginning(new Closure<LogEntry>() { - @Override - public void execute(LogEntry logEntry) { - replay(logEntry); - } - }); - } catch (CodingException | InvalidPositionException | StreamAccessException e) { - throw new RecoveryFailedException(e); - } + writeBehindStorage.bulkLoad(storeProvider -> { + try { + streamManager.readFromBeginning(LogStorage.this::replay); + } catch (CodingException | InvalidPositionException | StreamAccessException e) { + throw new RecoveryFailedException(e); } }); } @@ -545,17 +470,14 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore private void scheduleSnapshots() { if (snapshotInterval.getValue() > 0) { - schedulingService.doEvery(snapshotInterval, new Runnable() { - @Override - public void run() { - try { - snapshot(); - } catch (StorageException e) { - if (e.getCause() == null) { - LOG.log(Level.WARNING, "StorageException when attempting to snapshot.", e); - } else { - LOG.log(Level.WARNING, e.getMessage(), e.getCause()); - } + schedulingService.doEvery(snapshotInterval, () -> { + try { + snapshot(); + } catch (StorageException e) { + if (e.getCause() == null) { + LOG.log(Level.WARNING, "StorageException when attempting to snapshot.", e); + } else { + LOG.log(Level.WARNING, e.getMessage(), e.getCause()); } } }); @@ -571,21 +493,16 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore */ @Timed("scheduler_log_snapshot") void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException { - write(new MutateWork.NoResult<CodingException>() { - @Override - public void execute(MutableStoreProvider unused) - throws CodingException, InvalidPositionException, StreamAccessException { - - LOG.info("Creating snapshot."); - Snapshot snapshot = snapshotStore.createSnapshot(); - persist(snapshot); - LOG.info("Snapshot complete." - + " host attrs: " + snapshot.getHostAttributesSize() - + ", cron jobs: " + snapshot.getCronJobsSize() - + ", locks: " + snapshot.getLocksSize() - + ", quota confs: " + snapshot.getQuotaConfigurationsSize() - + ", tasks: " + snapshot.getTasksSize()); - } + write((NoResult<CodingException>) (MutableStoreProvider unused) -> { + LOG.info("Creating snapshot."); + Snapshot snapshot = snapshotStore.createSnapshot(); + persist(snapshot); + LOG.info("Snapshot complete." + + " host attrs: " + snapshot.getHostAttributesSize() + + ", cron jobs: " + snapshot.getCronJobsSize() + + ", locks: " + snapshot.getLocksSize() + + ", quota confs: " + snapshot.getQuotaConfigurationsSize() + + ", tasks: " + snapshot.getTasksSize()); }); } @@ -608,21 +525,18 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore transaction = streamManager.startTransaction(); try { - return writeBehindStorage.write(new MutateWork<T, E>() { - @Override - public T apply(MutableStoreProvider unused) throws E { - T result = work.apply(writeAheadStorage); - try { - transaction.commit(); - } catch (CodingException e) { - throw new IllegalStateException( - "Problem encoding transaction operations to the log stream", e); - } catch (StreamAccessException e) { - throw new StorageException( - "There was a problem committing the transaction to the log.", e); - } - return result; + return writeBehindStorage.write(unused -> { + T result = work.apply(writeAheadStorage); + try { + transaction.commit(); + } catch (CodingException e) { + throw new IllegalStateException( + "Problem encoding transaction operations to the log stream", e); + } catch (StreamAccessException e) { + throw new StorageException( + "There was a problem committing the transaction to the log.", e); } + return result; }); } finally { transaction = null; http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java index 79536f1..de14413 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java @@ -62,12 +62,7 @@ public interface SnapshotDeduplicator { private static final Logger LOG = Logger.getLogger(SnapshotDeduplicatorImpl.class.getName()); private static final Function<ScheduledTask, TaskConfig> SCHEDULED_TO_CONFIG = - new Function<ScheduledTask, TaskConfig>() { - @Override - public TaskConfig apply(ScheduledTask task) { - return task.getAssignedTask().getTask(); - } - }; + task -> task.getAssignedTask().getTask(); private static ScheduledTask deepCopyWithoutTaskConfig(ScheduledTask scheduledTask) { ScheduledTask scheduledTaskCopy = new ScheduledTask(); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java index ed1fffe..61058c7 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java @@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.log; import java.util.Arrays; import java.util.Map; import java.util.logging.Logger; + import javax.inject.Inject; import com.google.common.base.Optional; @@ -39,7 +40,7 @@ import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.Storage.Volatile; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -246,28 +247,25 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { public Snapshot createSnapshot() { // It's important to perform snapshot creation in a write lock to ensure all upstream callers // are correctly synchronized (e.g. during backup creation). - return storage.write(new MutateWork.Quiet<Snapshot>() { - @Override - public Snapshot apply(MutableStoreProvider storeProvider) { - Snapshot snapshot = new Snapshot(); - - // Capture timestamp to signify the beginning of a snapshot operation, apply after in case - // one of the field closures is mean and tries to apply a timestamp. - long timestamp = clock.nowMillis(); - for (SnapshotField field : SNAPSHOT_FIELDS) { - field.saveToSnapshot(storeProvider, snapshot); - } + return storage.write(storeProvider -> { + Snapshot snapshot = new Snapshot(); + + // Capture timestamp to signify the beginning of a snapshot operation, apply after in case + // one of the field closures is mean and tries to apply a timestamp. + long timestamp = clock.nowMillis(); + for (SnapshotField field : SNAPSHOT_FIELDS) { + field.saveToSnapshot(storeProvider, snapshot); + } - SchedulerMetadata metadata = new SchedulerMetadata() - .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull()) - .setVersion(CURRENT_API_VERSION); + SchedulerMetadata metadata = new SchedulerMetadata() + .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull()) + .setVersion(CURRENT_API_VERSION); - metadata.setDetails(buildInfo.getProperties()); + metadata.setDetails(buildInfo.getProperties()); - snapshot.setSchedulerMetadata(metadata); - snapshot.setTimestamp(timestamp); - return snapshot; - } + snapshot.setSchedulerMetadata(metadata); + snapshot.setTimestamp(timestamp); + return snapshot; }); } @@ -276,14 +274,11 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { public void applySnapshot(final Snapshot snapshot) { requireNonNull(snapshot); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - LOG.info("Restoring snapshot."); + storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> { + LOG.info("Restoring snapshot."); - for (SnapshotField field : SNAPSHOT_FIELDS) { - field.restoreFromSnapshot(storeProvider, snapshot); - } + for (SnapshotField field : SNAPSHOT_FIELDS) { + field.restoreFromSnapshot(storeProvider, snapshot); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java index 072fe45..01448ae 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java @@ -73,19 +73,9 @@ class MemTaskStore implements TaskStore.Mutable { private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS); private static final Function<Query.Builder, Optional<Set<IJobKey>>> QUERY_TO_JOB_KEY = - new Function<Query.Builder, Optional<Set<IJobKey>>>() { - @Override - public Optional<Set<IJobKey>> apply(Query.Builder query) { - return JobKeys.from(query); - } - }; + JobKeys::from; private static final Function<Query.Builder, Optional<Set<String>>> QUERY_TO_SLAVE_HOST = - new Function<Query.Builder, Optional<Set<String>>>() { - @Override - public Optional<Set<String>> apply(Query.Builder query) { - return Optional.fromNullable(query.get().getSlaveHosts()); - } - }; + query -> Optional.fromNullable(query.get().getSlaveHosts()); // Since this class operates under the API and umbrella of {@link Storage}, it is expected to be // thread-safe but not necessarily strongly-consistent unless the externally-controlled storage @@ -149,13 +139,7 @@ class MemTaskStore implements TaskStore.Mutable { .toSet(); } - private final Function<IScheduledTask, Task> toTask = - new Function<IScheduledTask, Task>() { - @Override - public Task apply(IScheduledTask task) { - return new Task(task, configInterner); - } - }; + private final Function<IScheduledTask, Task> toTask = task -> new Task(task, configInterner); @Timed("mem_storage_save_tasks") @Override @@ -290,13 +274,7 @@ class MemTaskStore implements TaskStore.Mutable { return FluentIterable.from(from.get()).filter(queryFilter(query)); } - private static final Function<Task, IScheduledTask> TO_SCHEDULED = - new Function<Task, IScheduledTask>() { - @Override - public IScheduledTask apply(Task task) { - return task.storedTask; - } - }; + private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> task.storedTask; private static final Function<Task, String> TO_ID = Functions.compose(Tasks::id, TO_SCHEDULED); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java index 65043fe..c28fb65 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.storage.mem; -import javax.annotation.Nullable; - import com.google.common.base.Function; import org.apache.thrift.TBase; @@ -36,17 +34,14 @@ final class Util { * @return A copier for the provided type of thrift structs. */ static <T extends TBase<T, ?>> Function<T, T> deepCopier() { - return new Function<T, T>() { - @Override - public T apply(@Nullable T input) { - if (input == null) { - return null; - } - - @SuppressWarnings("unchecked") - T t = (T) input.deepCopy(); - return t; + return input -> { + if (input == null) { + return null; } + + @SuppressWarnings("unchecked") + T t = (T) input.deepCopy(); + return t; }; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java b/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java index 5d0eaba..f9d0baf 100644 --- a/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java +++ b/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java @@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.testing; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -48,12 +47,7 @@ public class FakeStatsProvider implements StatsProvider { public Map<String, ? extends Number> getAllValues() { return ImmutableMap.copyOf(Maps.transformValues( stats, - new Function<Supplier<? extends Number>, Number>() { - @Override - public Number apply(Supplier<? extends Number> supplier) { - return supplier.get(); - } - })); + Supplier::get)); } /** @@ -69,12 +63,7 @@ public class FakeStatsProvider implements StatsProvider { @Override public AtomicLong makeCounter(String name) { final AtomicLong counter = new AtomicLong(); - stats.put(name, new Supplier<Long>() { - @Override - public Long get() { - return counter.get(); - } - }); + stats.put(name, counter::get); return counter; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java index 358b80c..c0e8a20 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java @@ -85,8 +85,6 @@ import org.apache.aurora.scheduler.quota.QuotaInfo; import org.apache.aurora.scheduler.quota.QuotaManager; import org.apache.aurora.scheduler.state.LockManager; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work.Quiet; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -111,15 +109,10 @@ import static org.apache.aurora.scheduler.thrift.Responses.ok; class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { private static final Function<Entry<ITaskConfig, Collection<Integer>>, ConfigGroup> TO_GROUP = - new Function<Entry<ITaskConfig, Collection<Integer>>, ConfigGroup>() { - @Override - public ConfigGroup apply(Entry<ITaskConfig, Collection<Integer>> input) { - return new ConfigGroup( - input.getKey().newBuilder(), - ImmutableSet.copyOf(input.getValue()), - IRange.toBuildersSet(convertRanges(toRanges(input.getValue())))); - } - }; + input -> new ConfigGroup( + input.getKey().newBuilder(), + ImmutableSet.copyOf(input.getValue()), + IRange.toBuildersSet(convertRanges(toRanges(input.getValue())))); private final Storage storage; private final NearestFit nearestFit; @@ -167,12 +160,9 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { public Response getTasksWithoutConfigs(TaskQuery query) { List<ScheduledTask> tasks = Lists.transform( getTasks(query), - new Function<ScheduledTask, ScheduledTask>() { - @Override - public ScheduledTask apply(ScheduledTask task) { - task.getAssignedTask().getTask().unsetExecutorConfig(); - return task; - } + task -> { + task.getAssignedTask().getTask().unsetExecutorConfig(); + return task; }); return ok(Result.scheduleStatusResult(new ScheduleStatusResult().setTasks(tasks))); @@ -191,25 +181,17 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { query.setStatuses(ImmutableSet.of(ScheduleStatus.PENDING)); Set<PendingReason> reasons = FluentIterable.from(getTasks(query)) - .transform(new Function<ScheduledTask, PendingReason>() { - @Override - public PendingReason apply(ScheduledTask scheduledTask) { - TaskGroupKey groupKey = TaskGroupKey.from( - ITaskConfig.build(scheduledTask.getAssignedTask().getTask())); - - String reason = Joiner.on(',').join(Iterables.transform( - nearestFit.getNearestFit(groupKey), - new Function<Veto, String>() { - @Override - public String apply(Veto veto) { - return veto.getReason(); - } - })); - - return new PendingReason() - .setTaskId(scheduledTask.getAssignedTask().getTaskId()) - .setReason(reason); - } + .transform(scheduledTask -> { + TaskGroupKey groupKey = TaskGroupKey.from( + ITaskConfig.build(scheduledTask.getAssignedTask().getTask())); + + String reason = Joiner.on(',').join(Iterables.transform( + nearestFit.getNearestFit(groupKey), + Veto::getReason)); + + return new PendingReason() + .setTaskId(scheduledTask.getAssignedTask().getTaskId()) + .setReason(reason); }).toSet(); return ok(Result.getPendingReasonResult(new GetPendingReasonResult(reasons))); @@ -233,12 +215,9 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { @Override public Response getRoleSummary() { - Multimap<String, IJobKey> jobsByRole = storage.read(new Quiet<Multimap<String, IJobKey>>() { - @Override - public Multimap<String, IJobKey> apply(StoreProvider storeProvider) { - return Multimaps.index(storeProvider.getTaskStore().getJobKeys(), IJobKey::getRole); - } - }); + Multimap<String, IJobKey> jobsByRole = storage.read( + storeProvider -> + Multimaps.index(storeProvider.getTaskStore().getJobKeys(), IJobKey::getRole)); Multimap<String, IJobKey> cronJobsByRole = Multimaps.index( Iterables.transform(Storage.Util.fetchCronJobs(storage), IJobConfiguration::getKey), @@ -246,15 +225,10 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { Set<RoleSummary> summaries = FluentIterable.from( Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) - .transform(new Function<String, RoleSummary>() { - @Override - public RoleSummary apply(String role) { - return new RoleSummary( - role, - jobsByRole.get(role).size(), - cronJobsByRole.get(role).size()); - } - }) + .transform(role -> new RoleSummary( + role, + jobsByRole.get(role).size(), + cronJobsByRole.get(role).size())) .toSet(); return ok(Result.roleSummaryResult(new RoleSummaryResult(summaries))); @@ -264,22 +238,19 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { public Response getJobSummary(@Nullable String maybeNullRole) { Optional<String> ownerRole = Optional.fromNullable(maybeNullRole); - final Multimap<IJobKey, IScheduledTask> tasks = getTasks(maybeRoleScoped(ownerRole)); - final Map<IJobKey, IJobConfiguration> jobs = getJobs(ownerRole, tasks); - - Function<IJobKey, JobSummary> makeJobSummary = new Function<IJobKey, JobSummary>() { - @Override - public JobSummary apply(IJobKey jobKey) { - IJobConfiguration job = jobs.get(jobKey); - JobSummary summary = new JobSummary() - .setJob(job.newBuilder()) - .setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder()); - - return Strings.isNullOrEmpty(job.getCronSchedule()) - ? summary - : summary.setNextCronRunMs( - cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime()); - } + Multimap<IJobKey, IScheduledTask> tasks = getTasks(maybeRoleScoped(ownerRole)); + Map<IJobKey, IJobConfiguration> jobs = getJobs(ownerRole, tasks); + + Function<IJobKey, JobSummary> makeJobSummary = jobKey -> { + IJobConfiguration job = jobs.get(jobKey); + JobSummary summary = new JobSummary() + .setJob(job.newBuilder()) + .setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder()); + + return Strings.isNullOrEmpty(job.getCronSchedule()) + ? summary + : summary.setNextCronRunMs( + cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime()); }; ImmutableSet<JobSummary> jobSummaries = @@ -299,21 +270,18 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { } @Override - public Response getQuota(final String ownerRole) { + public Response getQuota(String ownerRole) { MorePreconditions.checkNotBlank(ownerRole); - return storage.read(new Quiet<Response>() { - @Override - public Response apply(StoreProvider storeProvider) { - QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole, storeProvider); - GetQuotaResult result = new GetQuotaResult(quotaInfo.getQuota().newBuilder()) - .setProdSharedConsumption(quotaInfo.getProdSharedConsumption().newBuilder()) - .setProdDedicatedConsumption(quotaInfo.getProdDedicatedConsumption().newBuilder()) - .setNonProdSharedConsumption(quotaInfo.getNonProdSharedConsumption().newBuilder()) - .setNonProdDedicatedConsumption( - quotaInfo.getNonProdDedicatedConsumption().newBuilder()); - - return ok(Result.getQuotaResult(result)); - } + return storage.read(storeProvider -> { + QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole, storeProvider); + GetQuotaResult result = new GetQuotaResult(quotaInfo.getQuota().newBuilder()) + .setProdSharedConsumption(quotaInfo.getProdSharedConsumption().newBuilder()) + .setProdDedicatedConsumption(quotaInfo.getProdDedicatedConsumption().newBuilder()) + .setNonProdSharedConsumption(quotaInfo.getNonProdSharedConsumption().newBuilder()) + .setNonProdDedicatedConsumption( + quotaInfo.getNonProdDedicatedConsumption().newBuilder()); + + return ok(Result.getQuotaResult(result)); }); } @@ -324,28 +292,20 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { } @Override - public Response getJobUpdateSummaries(final JobUpdateQuery mutableQuery) { - final IJobUpdateQuery query = IJobUpdateQuery.build(requireNonNull(mutableQuery)); + public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) { + IJobUpdateQuery query = IJobUpdateQuery.build(requireNonNull(mutableQuery)); return ok(Result.getJobUpdateSummariesResult( - new GetJobUpdateSummariesResult().setUpdateSummaries(IJobUpdateSummary.toBuildersList( - storage.read(new Quiet<List<IJobUpdateSummary>>() { - @Override - public List<IJobUpdateSummary> apply(StoreProvider storeProvider) { - return storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query); - } - }))))); + new GetJobUpdateSummariesResult() + .setUpdateSummaries(IJobUpdateSummary.toBuildersList(storage.read( + storeProvider -> + storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query)))))); } @Override public Response getJobUpdateDetails(JobUpdateKey mutableKey) { - final IJobUpdateKey key = IJobUpdateKey.build(mutableKey); + IJobUpdateKey key = IJobUpdateKey.build(mutableKey); Optional<IJobUpdateDetails> details = - storage.read(new Quiet<Optional<IJobUpdateDetails>>() { - @Override - public Optional<IJobUpdateDetails> apply(StoreProvider storeProvider) { - return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key); - } - }); + storage.read(storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key)); if (details.isPresent()) { return ok(Result.getJobUpdateDetailsResult( @@ -357,43 +317,40 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { @Override public Response getJobUpdateDiff(JobUpdateRequest mutableRequest) { - final IJobUpdateRequest request = IJobUpdateRequest.build(requireNonNull(mutableRequest)); - final IJobKey job = request.getTaskConfig().getJob(); - - return storage.read(new Quiet<Response>() { - @Override - public Response apply(StoreProvider storeProvider) throws RuntimeException { - if (storeProvider.getCronJobStore().fetchJob(job).isPresent()) { - return invalidRequest(NO_CRON); - } - - JobDiff diff = JobDiff.compute( - storeProvider.getTaskStore(), - job, - JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()), - request.getSettings().getUpdateOnlyTheseInstances()); - - Map<Integer, ITaskConfig> replaced = diff.getReplacedInstances(); - Map<Integer, ITaskConfig> replacements = Maps.asMap( - diff.getReplacementInstances(), - Functions.constant(request.getTaskConfig())); - - Map<Integer, ITaskConfig> add = Maps.filterKeys( - replacements, - Predicates.in(Sets.difference(replacements.keySet(), replaced.keySet()))); - Map<Integer, ITaskConfig> remove = Maps.filterKeys( - replaced, - Predicates.in(Sets.difference(replaced.keySet(), replacements.keySet()))); - Map<Integer, ITaskConfig> update = Maps.filterKeys( - replaced, - Predicates.in(Sets.intersection(replaced.keySet(), replacements.keySet()))); - - return ok(Result.getJobUpdateDiffResult(new GetJobUpdateDiffResult() - .setAdd(instancesToConfigGroups(add)) - .setRemove(instancesToConfigGroups(remove)) - .setUpdate(instancesToConfigGroups(update)) - .setUnchanged(instancesToConfigGroups(diff.getUnchangedInstances())))); + IJobUpdateRequest request = IJobUpdateRequest.build(requireNonNull(mutableRequest)); + IJobKey job = request.getTaskConfig().getJob(); + + return storage.read(storeProvider -> { + if (storeProvider.getCronJobStore().fetchJob(job).isPresent()) { + return invalidRequest(NO_CRON); } + + JobDiff diff = JobDiff.compute( + storeProvider.getTaskStore(), + job, + JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()), + request.getSettings().getUpdateOnlyTheseInstances()); + + Map<Integer, ITaskConfig> replaced = diff.getReplacedInstances(); + Map<Integer, ITaskConfig> replacements = Maps.asMap( + diff.getReplacementInstances(), + Functions.constant(request.getTaskConfig())); + + Map<Integer, ITaskConfig> add = Maps.filterKeys( + replacements, + Predicates.in(Sets.difference(replacements.keySet(), replaced.keySet()))); + Map<Integer, ITaskConfig> remove = Maps.filterKeys( + replaced, + Predicates.in(Sets.difference(replaced.keySet(), replacements.keySet()))); + Map<Integer, ITaskConfig> update = Maps.filterKeys( + replaced, + Predicates.in(Sets.intersection(replaced.keySet(), replacements.keySet()))); + + return ok(Result.getJobUpdateDiffResult(new GetJobUpdateDiffResult() + .setAdd(instancesToConfigGroups(add)) + .setRemove(instancesToConfigGroups(remove)) + .setUpdate(instancesToConfigGroups(update)) + .setUnchanged(instancesToConfigGroups(diff.getUnchangedInstances())))); }); } @@ -435,23 +392,18 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap(); jobs.putAll(Maps.transformEntries(tasks.asMap(), - new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, IJobConfiguration>() { - @Override - public IJobConfiguration transformEntry( - IJobKey jobKey, - Collection<IScheduledTask> tasks) { - - // Pick the latest transitioned task for each immediate job since the job can be in the - // middle of an update or some shards have been selectively created. - TaskConfig mostRecentTaskConfig = - Tasks.getLatestActiveTask(tasks).getAssignedTask().getTask().newBuilder(); - - return IJobConfiguration.build(new JobConfiguration() - .setKey(jobKey.newBuilder()) - .setOwner(mostRecentTaskConfig.getOwner()) - .setTaskConfig(mostRecentTaskConfig) - .setInstanceCount(tasks.size())); - } + (jobKey, tasks1) -> { + + // Pick the latest transitioned task for each immediate job since the job can be in the + // middle of an update or some shards have been selectively created. + TaskConfig mostRecentTaskConfig = + Tasks.getLatestActiveTask(tasks1).getAssignedTask().getTask().newBuilder(); + + return IJobConfiguration.build(new JobConfiguration() + .setKey(jobKey.newBuilder()) + .setOwner(mostRecentTaskConfig.getOwner()) + .setTaskConfig(mostRecentTaskConfig) + .setInstanceCount(tasks1.size())); })); // Get cron jobs directly from the manager. Do this after querying the task store so the real
