http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java 
b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index c8ef858..5231e9f 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -34,8 +34,6 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -61,12 +59,7 @@ public class ResourceCounter {
   }
 
   private static final Function<MetricType, GlobalMetric> TO_GLOBAL_METRIC =
-      new Function<MetricType, GlobalMetric>() {
-        @Override
-        public GlobalMetric apply(MetricType type) {
-          return new GlobalMetric(type);
-        }
-      };
+      GlobalMetric::new;
 
   /**
    * Computes totals for each of the {@link MetricType}s.
@@ -94,15 +87,12 @@ public class ResourceCounter {
    * @throws StorageException if there was a problem fetching quotas from 
storage.
    */
   public Metric computeQuotaAllocationTotals() throws StorageException {
-    return storage.read(new Work.Quiet<Metric>() {
-      @Override
-      public Metric apply(StoreProvider storeProvider) {
-        Metric allocation = new Metric();
-        for (IResourceAggregate quota : 
storeProvider.getQuotaStore().fetchQuotas().values()) {
-          allocation.accumulate(quota);
-        }
-        return allocation;
+    return storage.read(storeProvider -> {
+      Metric allocation = new Metric();
+      for (IResourceAggregate quota : 
storeProvider.getQuotaStore().fetchQuotas().values()) {
+        allocation.accumulate(quota);
       }
+      return allocation;
     });
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java 
b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
index ced6b67..c9e57ec 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
@@ -126,12 +126,8 @@ class SlotSizeCounter implements Runnable {
   }
 
   private int countSlots(Iterable<IResourceAggregate> slots, final 
IResourceAggregate slotSize) {
-    Function<IResourceAggregate, Integer> counter = new 
Function<IResourceAggregate, Integer>() {
-      @Override
-      public Integer apply(IResourceAggregate machineSlack) {
-        return ResourceAggregates.divide(machineSlack, slotSize);
-      }
-    };
+    Function<IResourceAggregate, Integer> counter =
+        machineSlack -> ResourceAggregates.divide(machineSlack, slotSize);
 
     int sum = 0;
     for (int slotCount : FluentIterable.from(slots).transform(counter)) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java 
b/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java
index 6399e78..4767ef1 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/StatsModule.java
@@ -61,12 +61,7 @@ public class StatsModule extends AbstractModule {
     bind(TimeSeriesRepositoryImpl.class).in(Singleton.class);
 
     bind(new TypeLiteral<Supplier<Iterable<Stat<?>>>>() { }).toInstance(
-        new Supplier<Iterable<Stat<?>>>() {
-          @Override
-          public Iterable<Stat<?>> get() {
-            return Stats.getVariables();
-          }
-        }
+        Stats::getVariables
     );
 
     SchedulerServicesModule.addAppStartupServiceBinding(binder())

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
 
b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 2136154..6a5069f 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -30,6 +30,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -91,14 +92,11 @@ public class CallOrderEnforcingStorage implements 
NonVolatileStorage {
     checkInState(State.PREPARED);
     wrapped.start(initializationLogic);
     stateMachine.transition(State.READY);
-    wrapped.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy(
-            storeProvider.getTaskStore().fetchTasks(Query.unscoped()));
-        for (IScheduledTask task : tasks) {
-          eventSink.post(TaskStateChange.initialized(task));
-        }
+    wrapped.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> {
+      Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy(
+          storeProvider.getTaskStore().fetchTasks(Query.unscoped()));
+      for (IScheduledTask task : tasks) {
+        eventSink.post(TaskStateChange.initialized(task));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java 
b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index f699ba5..6109158 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -292,21 +292,11 @@ public interface Storage {
      * @return Tasks returned from the query.
      */
     public static Iterable<IScheduledTask> fetchTasks(Storage storage, final 
Builder query) {
-      return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() {
-        @Override
-        public Iterable<IScheduledTask> apply(StoreProvider storeProvider) {
-          return storeProvider.getTaskStore().fetchTasks(query);
-        }
-      });
+      return storage.read(storeProvider -> 
storeProvider.getTaskStore().fetchTasks(query));
     }
 
     public static Iterable<IJobConfiguration> fetchCronJobs(Storage storage) {
-      return storage.read(new Work.Quiet<Iterable<IJobConfiguration>>() {
-        @Override
-        public Iterable<IJobConfiguration> apply(Storage.StoreProvider 
storeProvider) {
-          return storeProvider.getCronJobStore().fetchJobs();
-        }
-      });
+      return storage.read(storeProvider -> 
storeProvider.getCronJobStore().fetchJobs());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java 
b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
index a0483f4..5401a28 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
@@ -24,7 +24,6 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -75,13 +74,10 @@ public final class StorageBackfill {
     // Backfilling job keys has to be done in a separate transaction to ensure 
follow up scoped
     // Query calls work against upgraded MemTaskStore, which does not support 
deprecated fields.
     LOG.info("Backfilling task config job keys.");
-    storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new 
TaskMutation() {
-      @Override
-      public IScheduledTask apply(final IScheduledTask task) {
-        ScheduledTask builder = task.newBuilder();
-        populateJobKey(builder.getAssignedTask().getTask(), 
BACKFILLED_TASK_CONFIG_KEYS);
-        return IScheduledTask.build(builder);
-      }
+    storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), task -> {
+      ScheduledTask builder = task.newBuilder();
+      populateJobKey(builder.getAssignedTask().getTask(), 
BACKFILLED_TASK_CONFIG_KEYS);
+      return IScheduledTask.build(builder);
     });
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java 
b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 142e4ac..62639c4 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -118,47 +118,44 @@ public interface TaskStore {
     }
 
     public static Predicate<IScheduledTask> queryFilter(final Query.Builder 
queryBuilder) {
-      return new Predicate<IScheduledTask>() {
-        @Override
-        public boolean apply(IScheduledTask task) {
-          TaskQuery query = queryBuilder.get();
-          ITaskConfig config = task.getAssignedTask().getTask();
-          // TODO(wfarner): Investigate why blank inputs are treated specially 
for the role field.
-          if (query.getRole() != null
-              && !WHITESPACE.matchesAllOf(query.getRole())
-              && !query.getRole().equals(config.getJob().getRole())) {
-            return false;
-          }
-          if (query.getEnvironment() != null
-              && !query.getEnvironment().equals(config.getEnvironment())) {
-            return false;
-          }
-          if (query.getJobName() != null && 
!query.getJobName().equals(config.getJobName())) {
-            return false;
-          }
-
-          if (query.getJobKeysSize() > 0
-              && !query.getJobKeys().contains(config.getJob().newBuilder())) {
-            return false;
-          }
-          if (query.getTaskIds() != null && 
!query.getTaskIds().contains(Tasks.id(task))) {
-            return false;
-          }
-
-          if (query.getStatusesSize() > 0 && 
!query.getStatuses().contains(task.getStatus())) {
-            return false;
-          }
-          if (query.getSlaveHostsSize() > 0
-              && 
!query.getSlaveHosts().contains(task.getAssignedTask().getSlaveHost())) {
-            return false;
-          }
-          if (query.getInstanceIdsSize() > 0
-              && 
!query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) {
-            return false;
-          }
-
-          return true;
+      return task -> {
+        TaskQuery query = queryBuilder.get();
+        ITaskConfig config = task.getAssignedTask().getTask();
+        // TODO(wfarner): Investigate why blank inputs are treated specially 
for the role field.
+        if (query.getRole() != null
+            && !WHITESPACE.matchesAllOf(query.getRole())
+            && !query.getRole().equals(config.getJob().getRole())) {
+          return false;
         }
+        if (query.getEnvironment() != null
+            && !query.getEnvironment().equals(config.getEnvironment())) {
+          return false;
+        }
+        if (query.getJobName() != null && 
!query.getJobName().equals(config.getJobName())) {
+          return false;
+        }
+
+        if (query.getJobKeysSize() > 0
+            && !query.getJobKeys().contains(config.getJob().newBuilder())) {
+          return false;
+        }
+        if (query.getTaskIds() != null && 
!query.getTaskIds().contains(Tasks.id(task))) {
+          return false;
+        }
+
+        if (query.getStatusesSize() > 0 && 
!query.getStatuses().contains(task.getStatus())) {
+          return false;
+        }
+        if (query.getSlaveHostsSize() > 0
+            && 
!query.getSlaveHosts().contains(task.getAssignedTask().getSlaveHost())) {
+          return false;
+        }
+        if (query.getInstanceIdsSize() > 0
+            && 
!query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) {
+          return false;
+        }
+
+        return true;
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java 
b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 18daa05..0f0218c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -34,7 +34,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.util.Objects.requireNonNull;
@@ -190,15 +190,12 @@ public interface Recovery {
       }
 
       void commit() {
-        primaryStorage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          public void execute(MutableStoreProvider storeProvider) {
-            try {
-              distributedStore.persist(tempStorage.toSnapshot());
-              shutDownNow.execute();
-            } catch (CodingException e) {
-              throw new IllegalStateException("Failed to encode snapshot.", e);
-            }
+        primaryStorage.write((NoResult.Quiet) (MutableStoreProvider 
storeProvider) -> {
+          try {
+            distributedStore.persist(tempStorage.toSnapshot());
+            shutDownNow.execute();
+          } catch (CodingException e) {
+            throw new IllegalStateException("Failed to encode snapshot.", e);
           }
         });
       }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java 
b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
index c4e18d4..2cd8793 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
@@ -138,12 +138,7 @@ public interface StorageBackup {
     public Snapshot createSnapshot() {
       final Snapshot snapshot = delegate.createSnapshot();
       if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) {
-        executor.execute(new Runnable() {
-          @Override
-          public void run() {
-            save(snapshot);
-          }
-        });
+        executor.execute(() -> save(snapshot));
       }
       return snapshot;
     }
@@ -209,20 +204,10 @@ public interface StorageBackup {
       }
     }
 
-    private static final FilenameFilter BACKUP_FILTER = new FilenameFilter() {
-      @Override
-      public boolean accept(File file, String s) {
-        return s.startsWith(FILE_PREFIX);
-      }
-    };
+    private static final FilenameFilter BACKUP_FILTER = (file, s) -> 
s.startsWith(FILE_PREFIX);
 
     @VisibleForTesting
-    static final Function<File, String> FILE_NAME = new Function<File, 
String>() {
-      @Override
-      public String apply(File file) {
-        return file.getName();
-      }
-    };
+    static final Function<File, String> FILE_NAME = File::getName;
 
     @Override
     public void applySnapshot(Snapshot snapshot) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
 
b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 6af059d..f683f79 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -26,9 +26,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
@@ -82,25 +80,17 @@ interface TemporaryStorage {
       return new TemporaryStorage() {
         @Override
         public void deleteTasks(final Query.Builder query) {
-          storage.write(new MutateWork.NoResult.Quiet() {
-            @Override
-            public void execute(MutableStoreProvider storeProvider) {
-              Set<String> ids = 
FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-                  .transform(Tasks::id)
-                  .toSet();
-              storeProvider.getUnsafeTaskStore().deleteTasks(ids);
-            }
+          storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) 
-> {
+            Set<String> ids = 
FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+                .transform(Tasks::id)
+                .toSet();
+            storeProvider.getUnsafeTaskStore().deleteTasks(ids);
           });
         }
 
         @Override
         public Iterable<IScheduledTask> fetchTasks(final Query.Builder query) {
-          return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() {
-            @Override
-            public Iterable<IScheduledTask> apply(StoreProvider storeProvider) 
{
-              return storeProvider.getTaskStore().fetchTasks(query);
-            }
-          });
+          return storage.read(storeProvider -> 
storeProvider.getTaskStore().fetchTasks(query));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java 
b/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
index ee3ff6c..6901098 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
@@ -79,12 +79,8 @@ class DbAttributeStore implements AttributeStore.Mutable {
     return true;
   }
 
-  private static final Predicate<IAttribute> EMPTY_VALUES = new 
Predicate<IAttribute>() {
-    @Override
-    public boolean apply(IAttribute attribute) {
-      return attribute.getValues().isEmpty();
-    }
-  };
+  private static final Predicate<IAttribute> EMPTY_VALUES =
+      attribute -> attribute.getValues().isEmpty();
 
   @Timed("attribute_store_fetch_one")
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java 
b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
index 7652132..d2673e6 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
@@ -158,12 +158,7 @@ public class DbJobUpdateStore implements 
JobUpdateStore.Mutable {
   }
 
   private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY =
-      new Function<PruneVictim, IJobUpdateKey>() {
-        @Override
-        public IJobUpdateKey apply(PruneVictim victim) {
-          return IJobUpdateKey.build(victim.getUpdate());
-        }
-      };
+      victim -> IJobUpdateKey.build(victim.getUpdate());
 
   @Timed("job_update_store_prune_history")
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java 
b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index dd7e1d3..7674b8a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -155,14 +155,11 @@ class DbStorage extends AbstractIdleService implements 
Storage {
     // introduction of DbStorage, but should be revisited.
     // TODO(wfarner): Consider revisiting to execute async work only when the 
transaction is
     // successful.
-    return gatedWorkQueue.closeDuring(new GatedOperation<T, E>() {
-      @Override
-      public T doWithGateClosed() throws E {
-        try {
-          return transactionedWrite(work);
-        } catch (PersistenceException e) {
-          throw new StorageException(e.getMessage(), e);
-        }
+    return gatedWorkQueue.closeDuring((GatedOperation<T, E>) () -> {
+      try {
+        return transactionedWrite(work);
+      } catch (PersistenceException e) {
+        throw new StorageException(e.getMessage(), e);
       }
     });
   }
@@ -179,23 +176,20 @@ class DbStorage extends AbstractIdleService implements 
Storage {
   public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
       throws StorageException, E {
 
-    gatedWorkQueue.closeDuring(new GatedOperation<Void, E>() {
-      @Override
-      public Void doWithGateClosed() throws E {
-        // Disabling the undo log disables transaction rollback, but 
dramatically speeds up a bulk
-        // insert.
-        try (SqlSession session = sessionFactory.openSession(false)) {
-          try {
-            session.update(DISABLE_UNDO_LOG);
-            work.apply(storeProvider);
-          } catch (PersistenceException e) {
-            throw new StorageException(e.getMessage(), e);
-          } finally {
-            session.update(ENABLE_UNDO_LOG);
-          }
+    gatedWorkQueue.closeDuring(() -> {
+      // Disabling the undo log disables transaction rollback, but 
dramatically speeds up a bulk
+      // insert.
+      try (SqlSession session = sessionFactory.openSession(false)) {
+        try {
+          session.update(DISABLE_UNDO_LOG);
+          work.apply(storeProvider);
+        } catch (PersistenceException e) {
+          throw new StorageException(e.getMessage(), e);
+        } finally {
+          session.update(ENABLE_UNDO_LOG);
         }
-        return null;
       }
+      return null;
     });
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java 
b/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java
index df6e583..2684054 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollector.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.AbstractScheduledService;
 
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.ibatis.exceptions.PersistenceException;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
@@ -72,18 +73,15 @@ class RowGarbageCollector extends AbstractScheduledService {
 
     final AtomicLong deletedCount = new AtomicLong();
     for (Class<? extends GarbageCollectedTableMapper> tableClass : TABLES) {
-      storage.write(new Storage.MutateWork.NoResult.Quiet() {
-        @Override
-        public void execute(Storage.MutableStoreProvider storeProvider) {
-          try (SqlSession session = sessionFactory.openSession(true)) {
-            GarbageCollectedTableMapper table = session.getMapper(tableClass);
-            for (long rowId : table.selectAllRowIds()) {
-              try {
-                table.deleteRow(rowId);
-                deletedCount.incrementAndGet();
-              } catch (PersistenceException e) {
-                // Expected for rows that are still referenced.
-              }
+      storage.write((NoResult.Quiet) (Storage.MutableStoreProvider 
storeProvider) -> {
+        try (SqlSession session = sessionFactory.openSession(true)) {
+          GarbageCollectedTableMapper table = session.getMapper(tableClass);
+          for (long rowId : table.selectAllRowIds()) {
+            try {
+              table.deleteRow(rowId);
+              deletedCount.incrementAndGet();
+            } catch (PersistenceException e) {
+              // Expected for rows that are still referenced.
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java 
b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
index 9b6add9..dea2bda 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
@@ -80,12 +80,7 @@ public interface EntrySerializer {
       final byte[] header = encode(
           Frame.header(new FrameHeader(chunks, 
ByteBuffer.wrap(checksum(entry)))));
 
-      return new Iterable<byte[]>() {
-        @Override
-        public Iterator<byte[]> iterator() {
-          return streamFrames(header, chunks, entry);
-        }
-      };
+      return () -> streamFrames(header, chunks, entry);
     }
 
     Iterator<byte[]> streamFrames(final byte[] header, final int chunks, final 
byte[] entry) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java 
b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index adbf459..8aaff22 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -32,7 +32,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -61,6 +60,7 @@ import org.apache.aurora.scheduler.storage.QuotaStore;
 import org.apache.aurora.scheduler.storage.SchedulerStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -158,16 +158,10 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
     ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
         Amount<Long, Time> shutdownGracePeriod) {
       scheduledExecutor = 
AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
-      shutdownRegistry.addAction(new Command() {
-
-        @Override
-        public void execute() throws RuntimeException {
-          MoreExecutors.shutdownAndAwaitTermination(
-              scheduledExecutor,
-              shutdownGracePeriod.getValue(),
-              shutdownGracePeriod.getUnit().getTimeUnit());
-        }
-      });
+      shutdownRegistry.addAction(() -> 
MoreExecutors.shutdownAndAwaitTermination(
+          scheduledExecutor,
+          shutdownGracePeriod.getValue(),
+          shutdownGracePeriod.getUnit().getTimeUnit()));
     }
 
     @Override
@@ -311,32 +305,21 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
   @VisibleForTesting
   final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() {
     return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
-        .put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() {
-          @Override
-          public void execute(LogEntry logEntry) {
-            Snapshot snapshot = logEntry.getSnapshot();
-            LOG.info("Applying snapshot taken on " + new 
Date(snapshot.getTimestamp()));
-            snapshotStore.applySnapshot(snapshot);
-          }
+        .put(LogEntry._Fields.SNAPSHOT, logEntry -> {
+          Snapshot snapshot = logEntry.getSnapshot();
+          LOG.info("Applying snapshot taken on " + new 
Date(snapshot.getTimestamp()));
+          snapshotStore.applySnapshot(snapshot);
         })
-        .put(LogEntry._Fields.TRANSACTION, new Closure<LogEntry>() {
+        .put(LogEntry._Fields.TRANSACTION, logEntry -> write(new 
MutateWork.NoResult.Quiet() {
           @Override
-          public void execute(final LogEntry logEntry) {
-            write(new MutateWork.NoResult.Quiet() {
-              @Override
-              public void execute(MutableStoreProvider unused) {
-                for (Op op : logEntry.getTransaction().getOps()) {
-                  replayOp(op);
-                }
-              }
-            });
-          }
-        })
-        .put(LogEntry._Fields.NOOP, new Closure<LogEntry>() {
-          @Override
-          public void execute(LogEntry item) {
-            // Nothing to do here
+          public void execute(MutableStoreProvider unused) {
+            for (Op op : logEntry.getTransaction().getOps()) {
+              replayOp(op);
+            }
           }
+        }))
+        .put(LogEntry._Fields.NOOP, item -> {
+          // Nothing to do here
         })
         .build();
   }
@@ -344,124 +327,77 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
   @VisibleForTesting
   final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() {
     return ImmutableMap.<Op._Fields, Closure<Op>>builder()
-        .put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            
writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
-          }
+        .put(
+            Op._Fields.SAVE_FRAMEWORK_ID,
+            op -> 
writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
+        .put(Op._Fields.SAVE_CRON_JOB, op -> {
+          SaveCronJob cronJob = op.getSaveCronJob();
+          writeBehindJobStore.saveAcceptedJob(
+              IJobConfiguration.build(cronJob.getJobConfig()));
         })
-        .put(Op._Fields.SAVE_CRON_JOB, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            SaveCronJob cronJob = op.getSaveCronJob();
-            writeBehindJobStore.saveAcceptedJob(
-                IJobConfiguration.build(cronJob.getJobConfig()));
-          }
+        .put(
+            Op._Fields.REMOVE_JOB,
+            op -> 
writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
+        .put(
+            Op._Fields.SAVE_TASKS,
+            op -> writeBehindTaskStore.saveTasks(
+            IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks())))
+        .put(Op._Fields.REWRITE_TASK, op -> {
+          RewriteTask rewriteTask = op.getRewriteTask();
+          writeBehindTaskStore.unsafeModifyInPlace(
+              rewriteTask.getTaskId(),
+              ITaskConfig.build(rewriteTask.getTask()));
         })
-        .put(Op._Fields.REMOVE_JOB, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            
writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
-          }
+        .put(
+            Op._Fields.REMOVE_TASKS,
+            op -> 
writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
+        .put(Op._Fields.SAVE_QUOTA, op -> {
+          SaveQuota saveQuota = op.getSaveQuota();
+          writeBehindQuotaStore.saveQuota(
+              saveQuota.getRole(),
+              IResourceAggregate.build(saveQuota.getQuota()));
         })
-        .put(Op._Fields.SAVE_TASKS, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            writeBehindTaskStore.saveTasks(
-                IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
+        .put(
+            Op._Fields.REMOVE_QUOTA,
+            op -> 
writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
+        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
+          HostAttributes attributes = 
op.getSaveHostAttributes().getHostAttributes();
+          // Prior to commit 5cf760b, the store would persist maintenance mode 
changes for
+          // unknown hosts.  5cf760b began rejecting these, but the replicated 
log may still
+          // contain entries with a null slave ID.
+          if (attributes.isSetSlaveId()) {
+            
writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+          } else {
+            LOG.info("Dropping host attributes with no slave ID: " + 
attributes);
           }
         })
-        .put(Op._Fields.REWRITE_TASK, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            RewriteTask rewriteTask = op.getRewriteTask();
-            writeBehindTaskStore.unsafeModifyInPlace(
-                rewriteTask.getTaskId(),
-                ITaskConfig.build(rewriteTask.getTask()));
-          }
-        })
-        .put(Op._Fields.REMOVE_TASKS, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
-          }
-        })
-        .put(Op._Fields.SAVE_QUOTA, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            SaveQuota saveQuota = op.getSaveQuota();
-            writeBehindQuotaStore.saveQuota(
-                saveQuota.getRole(),
-                IResourceAggregate.build(saveQuota.getQuota()));
-          }
+        .put(
+            Op._Fields.SAVE_LOCK,
+            op -> 
writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock())))
+        .put(
+            Op._Fields.REMOVE_LOCK,
+            op -> 
writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey())))
+        .put(Op._Fields.SAVE_JOB_UPDATE, op -> {
+          JobUpdate update = op.getSaveJobUpdate().getJobUpdate();
+          writeBehindJobUpdateStore.saveJobUpdate(
+              IJobUpdate.build(update),
+              Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
         })
-        .put(Op._Fields.REMOVE_QUOTA, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
-          }
+        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
+          SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
+          writeBehindJobUpdateStore.saveJobUpdateEvent(
+              IJobUpdateKey.build(event.getKey()),
+              IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
         })
-        .put(Op._Fields.SAVE_HOST_ATTRIBUTES, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            HostAttributes attributes = 
op.getSaveHostAttributes().getHostAttributes();
-            // Prior to commit 5cf760b, the store would persist maintenance 
mode changes for
-            // unknown hosts.  5cf760b began rejecting these, but the 
replicated log may still
-            // contain entries with a null slave ID.
-            if (attributes.isSetSlaveId()) {
-              
writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
-            } else {
-              LOG.info("Dropping host attributes with no slave ID: " + 
attributes);
-            }
-          }
+        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
+          SaveJobInstanceUpdateEvent event = 
op.getSaveJobInstanceUpdateEvent();
+          writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
+              IJobUpdateKey.build(event.getKey()),
+              
IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
         })
-        .put(Op._Fields.SAVE_LOCK, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            
writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
-          }
-        })
-        .put(Op._Fields.REMOVE_LOCK, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            
writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
-          }
-        })
-        .put(Op._Fields.SAVE_JOB_UPDATE, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            JobUpdate update = op.getSaveJobUpdate().getJobUpdate();
-            writeBehindJobUpdateStore.saveJobUpdate(
-                IJobUpdate.build(update),
-                Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
-          }
-        })
-        .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
-            writeBehindJobUpdateStore.saveJobUpdateEvent(
-                IJobUpdateKey.build(event.getKey()),
-                IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
-          }
-        })
-        .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            SaveJobInstanceUpdateEvent event = 
op.getSaveJobInstanceUpdateEvent();
-            writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
-                IJobUpdateKey.build(event.getKey()),
-                
IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
-          }
-        })
-        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, new Closure<Op>() {
-          @Override
-          public void execute(Op op) {
-            writeBehindJobUpdateStore.pruneHistory(
-                op.getPruneJobUpdateHistory().getPerJobRetainCount(),
-                op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs());
-          }
-        }).build();
+        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> 
writeBehindJobUpdateStore.pruneHistory(
+            op.getPruneJobUpdateHistory().getPerJobRetainCount(),
+            
op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs())).build();
   }
 
   @Override
@@ -477,19 +413,16 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
 
   @Override
   public synchronized void start(final MutateWork.NoResult.Quiet 
initializationLogic) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider unused) {
-        // Must have the underlying storage started so we can query it for the 
last checkpoint.
-        // We replay these entries in the forwarded storage system's 
transactions but not ours - we
-        // do not want to re-record these ops to the log.
-        recover();
-        recovered = true;
-
-        // Now that we're recovered we should let any mutations done in 
initializationLogic append
-        // to the log, so run it in one of our transactions.
-        write(initializationLogic);
-      }
+    write((NoResult.Quiet) (MutableStoreProvider unused) -> {
+      // Must have the underlying storage started so we can query it for the 
last checkpoint.
+      // We replay these entries in the forwarded storage system's 
transactions but not ours - we
+      // do not want to re-record these ops to the log.
+      recover();
+      recovered = true;
+
+      // Now that we're recovered we should let any mutations done in 
initializationLogic append
+      // to the log, so run it in one of our transactions.
+      write(initializationLogic);
     });
 
     scheduleSnapshots();
@@ -502,19 +435,11 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
 
   @Timed("scheduler_log_recover")
   void recover() throws RecoveryFailedException {
-    writeBehindStorage.bulkLoad(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        try {
-          streamManager.readFromBeginning(new Closure<LogEntry>() {
-            @Override
-            public void execute(LogEntry logEntry) {
-              replay(logEntry);
-            }
-          });
-        } catch (CodingException | InvalidPositionException | 
StreamAccessException e) {
-          throw new RecoveryFailedException(e);
-        }
+    writeBehindStorage.bulkLoad(storeProvider -> {
+      try {
+        streamManager.readFromBeginning(LogStorage.this::replay);
+      } catch (CodingException | InvalidPositionException | 
StreamAccessException e) {
+        throw new RecoveryFailedException(e);
       }
     });
   }
@@ -545,17 +470,14 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
 
   private void scheduleSnapshots() {
     if (snapshotInterval.getValue() > 0) {
-      schedulingService.doEvery(snapshotInterval, new Runnable() {
-        @Override
-        public void run() {
-          try {
-            snapshot();
-          } catch (StorageException e) {
-            if (e.getCause() == null) {
-              LOG.log(Level.WARNING, "StorageException when attempting to 
snapshot.", e);
-            } else {
-              LOG.log(Level.WARNING, e.getMessage(), e.getCause());
-            }
+      schedulingService.doEvery(snapshotInterval, () -> {
+        try {
+          snapshot();
+        } catch (StorageException e) {
+          if (e.getCause() == null) {
+            LOG.log(Level.WARNING, "StorageException when attempting to 
snapshot.", e);
+          } else {
+            LOG.log(Level.WARNING, e.getMessage(), e.getCause());
           }
         }
       });
@@ -571,21 +493,16 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
    */
   @Timed("scheduler_log_snapshot")
   void doSnapshot() throws CodingException, InvalidPositionException, 
StreamAccessException {
-    write(new MutateWork.NoResult<CodingException>() {
-      @Override
-      public void execute(MutableStoreProvider unused)
-          throws CodingException, InvalidPositionException, 
StreamAccessException {
-
-        LOG.info("Creating snapshot.");
-        Snapshot snapshot = snapshotStore.createSnapshot();
-        persist(snapshot);
-        LOG.info("Snapshot complete."
-            + " host attrs: " + snapshot.getHostAttributesSize()
-            + ", cron jobs: " + snapshot.getCronJobsSize()
-            + ", locks: " + snapshot.getLocksSize()
-            + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
-            + ", tasks: " + snapshot.getTasksSize());
-      }
+    write((NoResult<CodingException>) (MutableStoreProvider unused) -> {
+      LOG.info("Creating snapshot.");
+      Snapshot snapshot = snapshotStore.createSnapshot();
+      persist(snapshot);
+      LOG.info("Snapshot complete."
+          + " host attrs: " + snapshot.getHostAttributesSize()
+          + ", cron jobs: " + snapshot.getCronJobsSize()
+          + ", locks: " + snapshot.getLocksSize()
+          + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+          + ", tasks: " + snapshot.getTasksSize());
     });
   }
 
@@ -608,21 +525,18 @@ public class LogStorage implements NonVolatileStorage, 
DistributedSnapshotStore
 
     transaction = streamManager.startTransaction();
     try {
-      return writeBehindStorage.write(new MutateWork<T, E>() {
-        @Override
-        public T apply(MutableStoreProvider unused) throws E {
-          T result = work.apply(writeAheadStorage);
-          try {
-            transaction.commit();
-          } catch (CodingException e) {
-            throw new IllegalStateException(
-                "Problem encoding transaction operations to the log stream", 
e);
-          } catch (StreamAccessException e) {
-            throw new StorageException(
-                "There was a problem committing the transaction to the log.", 
e);
-          }
-          return result;
+      return writeBehindStorage.write(unused -> {
+        T result = work.apply(writeAheadStorage);
+        try {
+          transaction.commit();
+        } catch (CodingException e) {
+          throw new IllegalStateException(
+              "Problem encoding transaction operations to the log stream", e);
+        } catch (StreamAccessException e) {
+          throw new StorageException(
+              "There was a problem committing the transaction to the log.", e);
         }
+        return result;
       });
     } finally {
       transaction = null;

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
 
b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
index 79536f1..de14413 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
@@ -62,12 +62,7 @@ public interface SnapshotDeduplicator {
     private static final Logger LOG = 
Logger.getLogger(SnapshotDeduplicatorImpl.class.getName());
 
     private static final Function<ScheduledTask, TaskConfig> 
SCHEDULED_TO_CONFIG =
-        new Function<ScheduledTask, TaskConfig>() {
-          @Override
-          public TaskConfig apply(ScheduledTask task) {
-            return task.getAssignedTask().getTask();
-          }
-        };
+        task -> task.getAssignedTask().getTask();
 
     private static ScheduledTask deepCopyWithoutTaskConfig(ScheduledTask 
scheduledTask) {
       ScheduledTask scheduledTaskCopy = new ScheduledTask();

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java 
b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index ed1fffe..61058c7 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.log;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.logging.Logger;
+
 import javax.inject.Inject;
 
 import com.google.common.base.Optional;
@@ -39,7 +40,7 @@ import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -246,28 +247,25 @@ public class SnapshotStoreImpl implements 
SnapshotStore<Snapshot> {
   public Snapshot createSnapshot() {
     // It's important to perform snapshot creation in a write lock to ensure 
all upstream callers
     // are correctly synchronized (e.g. during backup creation).
-    return storage.write(new MutateWork.Quiet<Snapshot>() {
-      @Override
-      public Snapshot apply(MutableStoreProvider storeProvider) {
-        Snapshot snapshot = new Snapshot();
-
-        // Capture timestamp to signify the beginning of a snapshot operation, 
apply after in case
-        // one of the field closures is mean and tries to apply a timestamp.
-        long timestamp = clock.nowMillis();
-        for (SnapshotField field : SNAPSHOT_FIELDS) {
-          field.saveToSnapshot(storeProvider, snapshot);
-        }
+    return storage.write(storeProvider -> {
+      Snapshot snapshot = new Snapshot();
+
+      // Capture timestamp to signify the beginning of a snapshot operation, 
apply after in case
+      // one of the field closures is mean and tries to apply a timestamp.
+      long timestamp = clock.nowMillis();
+      for (SnapshotField field : SNAPSHOT_FIELDS) {
+        field.saveToSnapshot(storeProvider, snapshot);
+      }
 
-        SchedulerMetadata metadata = new SchedulerMetadata()
-            
.setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull())
-            .setVersion(CURRENT_API_VERSION);
+      SchedulerMetadata metadata = new SchedulerMetadata()
+          
.setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull())
+          .setVersion(CURRENT_API_VERSION);
 
-        metadata.setDetails(buildInfo.getProperties());
+      metadata.setDetails(buildInfo.getProperties());
 
-        snapshot.setSchedulerMetadata(metadata);
-        snapshot.setTimestamp(timestamp);
-        return snapshot;
-      }
+      snapshot.setSchedulerMetadata(metadata);
+      snapshot.setTimestamp(timestamp);
+      return snapshot;
     });
   }
 
@@ -276,14 +274,11 @@ public class SnapshotStoreImpl implements 
SnapshotStore<Snapshot> {
   public void applySnapshot(final Snapshot snapshot) {
     requireNonNull(snapshot);
 
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        LOG.info("Restoring snapshot.");
+    storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> {
+      LOG.info("Restoring snapshot.");
 
-        for (SnapshotField field : SNAPSHOT_FIELDS) {
-          field.restoreFromSnapshot(storeProvider, snapshot);
-        }
+      for (SnapshotField field : SNAPSHOT_FIELDS) {
+        field.restoreFromSnapshot(storeProvider, snapshot);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java 
b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index 072fe45..01448ae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -73,19 +73,9 @@ class MemTaskStore implements TaskStore.Mutable {
   private final long slowQueryThresholdNanos = 
SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
 
   private static final Function<Query.Builder, Optional<Set<IJobKey>>> 
QUERY_TO_JOB_KEY =
-      new Function<Query.Builder, Optional<Set<IJobKey>>>() {
-        @Override
-        public Optional<Set<IJobKey>> apply(Query.Builder query) {
-          return JobKeys.from(query);
-        }
-      };
+      JobKeys::from;
   private static final Function<Query.Builder, Optional<Set<String>>> 
QUERY_TO_SLAVE_HOST =
-      new Function<Query.Builder, Optional<Set<String>>>() {
-        @Override
-        public Optional<Set<String>> apply(Query.Builder query) {
-          return Optional.fromNullable(query.get().getSlaveHosts());
-        }
-      };
+      query -> Optional.fromNullable(query.get().getSlaveHosts());
 
   // Since this class operates under the API and umbrella of {@link Storage}, 
it is expected to be
   // thread-safe but not necessarily strongly-consistent unless the 
externally-controlled storage
@@ -149,13 +139,7 @@ class MemTaskStore implements TaskStore.Mutable {
         .toSet();
   }
 
-  private final Function<IScheduledTask, Task> toTask =
-      new Function<IScheduledTask, Task>() {
-        @Override
-        public Task apply(IScheduledTask task) {
-          return new Task(task, configInterner);
-        }
-      };
+  private final Function<IScheduledTask, Task> toTask = task -> new Task(task, 
configInterner);
 
   @Timed("mem_storage_save_tasks")
   @Override
@@ -290,13 +274,7 @@ class MemTaskStore implements TaskStore.Mutable {
     return FluentIterable.from(from.get()).filter(queryFilter(query));
   }
 
-  private static final Function<Task, IScheduledTask> TO_SCHEDULED =
-      new Function<Task, IScheduledTask>() {
-        @Override
-        public IScheduledTask apply(Task task) {
-          return task.storedTask;
-        }
-      };
+  private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> 
task.storedTask;
 
   private static final Function<Task, String> TO_ID =
       Functions.compose(Tasks::id, TO_SCHEDULED);

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java 
b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
index 65043fe..c28fb65 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.storage.mem;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Function;
 
 import org.apache.thrift.TBase;
@@ -36,17 +34,14 @@ final class Util {
    * @return A copier for the provided type of thrift structs.
    */
   static <T extends TBase<T, ?>> Function<T, T> deepCopier() {
-    return new Function<T, T>() {
-      @Override
-      public T apply(@Nullable T input) {
-        if (input == null) {
-          return null;
-        }
-
-        @SuppressWarnings("unchecked")
-        T t = (T) input.deepCopy();
-        return t;
+    return input -> {
+      if (input == null) {
+        return null;
       }
+
+      @SuppressWarnings("unchecked")
+      T t = (T) input.deepCopy();
+      return t;
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java 
b/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
index 5d0eaba..f9d0baf 100644
--- a/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
+++ b/src/main/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.testing;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.base.Function;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -48,12 +47,7 @@ public class FakeStatsProvider implements StatsProvider {
   public Map<String, ? extends Number> getAllValues() {
     return ImmutableMap.copyOf(Maps.transformValues(
         stats,
-        new Function<Supplier<? extends Number>, Number>() {
-        @Override
-        public Number apply(Supplier<? extends Number> supplier) {
-          return supplier.get();
-        }
-      }));
+        Supplier::get));
   }
 
   /**
@@ -69,12 +63,7 @@ public class FakeStatsProvider implements StatsProvider {
   @Override
   public AtomicLong makeCounter(String name) {
     final AtomicLong counter = new AtomicLong();
-    stats.put(name, new Supplier<Long>() {
-      @Override
-      public Long get() {
-        return counter.get();
-      }
-    });
+    stats.put(name, counter::get);
     return counter;
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index 358b80c..c0e8a20 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -85,8 +85,6 @@ import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -111,15 +109,10 @@ import static 
org.apache.aurora.scheduler.thrift.Responses.ok;
 
 class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
   private static final Function<Entry<ITaskConfig, Collection<Integer>>, 
ConfigGroup> TO_GROUP =
-      new Function<Entry<ITaskConfig, Collection<Integer>>, ConfigGroup>() {
-        @Override
-        public ConfigGroup apply(Entry<ITaskConfig, Collection<Integer>> 
input) {
-          return new ConfigGroup(
-              input.getKey().newBuilder(),
-              ImmutableSet.copyOf(input.getValue()),
-              IRange.toBuildersSet(convertRanges(toRanges(input.getValue()))));
-        }
-      };
+      input -> new ConfigGroup(
+          input.getKey().newBuilder(),
+          ImmutableSet.copyOf(input.getValue()),
+          IRange.toBuildersSet(convertRanges(toRanges(input.getValue()))));
 
   private final Storage storage;
   private final NearestFit nearestFit;
@@ -167,12 +160,9 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
   public Response getTasksWithoutConfigs(TaskQuery query) {
     List<ScheduledTask> tasks = Lists.transform(
         getTasks(query),
-        new Function<ScheduledTask, ScheduledTask>() {
-          @Override
-          public ScheduledTask apply(ScheduledTask task) {
-            task.getAssignedTask().getTask().unsetExecutorConfig();
-            return task;
-          }
+        task -> {
+          task.getAssignedTask().getTask().unsetExecutorConfig();
+          return task;
         });
 
     return ok(Result.scheduleStatusResult(new 
ScheduleStatusResult().setTasks(tasks)));
@@ -191,25 +181,17 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
     query.setStatuses(ImmutableSet.of(ScheduleStatus.PENDING));
 
     Set<PendingReason> reasons = FluentIterable.from(getTasks(query))
-        .transform(new Function<ScheduledTask, PendingReason>() {
-          @Override
-          public PendingReason apply(ScheduledTask scheduledTask) {
-            TaskGroupKey groupKey = TaskGroupKey.from(
-                ITaskConfig.build(scheduledTask.getAssignedTask().getTask()));
-
-            String reason = Joiner.on(',').join(Iterables.transform(
-                nearestFit.getNearestFit(groupKey),
-                new Function<Veto, String>() {
-                  @Override
-                  public String apply(Veto veto) {
-                    return veto.getReason();
-                  }
-                }));
-
-            return new PendingReason()
-                .setTaskId(scheduledTask.getAssignedTask().getTaskId())
-                .setReason(reason);
-          }
+        .transform(scheduledTask -> {
+          TaskGroupKey groupKey = TaskGroupKey.from(
+              ITaskConfig.build(scheduledTask.getAssignedTask().getTask()));
+
+          String reason = Joiner.on(',').join(Iterables.transform(
+              nearestFit.getNearestFit(groupKey),
+              Veto::getReason));
+
+          return new PendingReason()
+              .setTaskId(scheduledTask.getAssignedTask().getTaskId())
+              .setReason(reason);
         }).toSet();
 
     return ok(Result.getPendingReasonResult(new 
GetPendingReasonResult(reasons)));
@@ -233,12 +215,9 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
 
   @Override
   public Response getRoleSummary() {
-    Multimap<String, IJobKey> jobsByRole = storage.read(new 
Quiet<Multimap<String, IJobKey>>() {
-      @Override
-      public Multimap<String, IJobKey> apply(StoreProvider storeProvider) {
-        return Multimaps.index(storeProvider.getTaskStore().getJobKeys(), 
IJobKey::getRole);
-      }
-    });
+    Multimap<String, IJobKey> jobsByRole = storage.read(
+        storeProvider ->
+            Multimaps.index(storeProvider.getTaskStore().getJobKeys(), 
IJobKey::getRole));
 
     Multimap<String, IJobKey> cronJobsByRole = Multimaps.index(
         Iterables.transform(Storage.Util.fetchCronJobs(storage), 
IJobConfiguration::getKey),
@@ -246,15 +225,10 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
 
     Set<RoleSummary> summaries = FluentIterable.from(
         Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet()))
-        .transform(new Function<String, RoleSummary>() {
-          @Override
-          public RoleSummary apply(String role) {
-            return new RoleSummary(
-                role,
-                jobsByRole.get(role).size(),
-                cronJobsByRole.get(role).size());
-          }
-        })
+        .transform(role -> new RoleSummary(
+            role,
+            jobsByRole.get(role).size(),
+            cronJobsByRole.get(role).size()))
         .toSet();
 
     return ok(Result.roleSummaryResult(new RoleSummaryResult(summaries)));
@@ -264,22 +238,19 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
   public Response getJobSummary(@Nullable String maybeNullRole) {
     Optional<String> ownerRole = Optional.fromNullable(maybeNullRole);
 
-    final Multimap<IJobKey, IScheduledTask> tasks = 
getTasks(maybeRoleScoped(ownerRole));
-    final Map<IJobKey, IJobConfiguration> jobs = getJobs(ownerRole, tasks);
-
-    Function<IJobKey, JobSummary> makeJobSummary = new Function<IJobKey, 
JobSummary>() {
-      @Override
-      public JobSummary apply(IJobKey jobKey) {
-        IJobConfiguration job = jobs.get(jobKey);
-        JobSummary summary = new JobSummary()
-            .setJob(job.newBuilder())
-            .setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder());
-
-        return Strings.isNullOrEmpty(job.getCronSchedule())
-            ? summary
-            : summary.setNextCronRunMs(
-            
cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime());
-      }
+    Multimap<IJobKey, IScheduledTask> tasks = 
getTasks(maybeRoleScoped(ownerRole));
+    Map<IJobKey, IJobConfiguration> jobs = getJobs(ownerRole, tasks);
+
+    Function<IJobKey, JobSummary> makeJobSummary = jobKey -> {
+      IJobConfiguration job = jobs.get(jobKey);
+      JobSummary summary = new JobSummary()
+          .setJob(job.newBuilder())
+          .setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder());
+
+      return Strings.isNullOrEmpty(job.getCronSchedule())
+          ? summary
+          : summary.setNextCronRunMs(
+          
cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime());
     };
 
     ImmutableSet<JobSummary> jobSummaries =
@@ -299,21 +270,18 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
   }
 
   @Override
-  public Response getQuota(final String ownerRole) {
+  public Response getQuota(String ownerRole) {
     MorePreconditions.checkNotBlank(ownerRole);
-    return storage.read(new Quiet<Response>() {
-      @Override
-      public Response apply(StoreProvider storeProvider) {
-        QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole, 
storeProvider);
-        GetQuotaResult result = new 
GetQuotaResult(quotaInfo.getQuota().newBuilder())
-            
.setProdSharedConsumption(quotaInfo.getProdSharedConsumption().newBuilder())
-            
.setProdDedicatedConsumption(quotaInfo.getProdDedicatedConsumption().newBuilder())
-            
.setNonProdSharedConsumption(quotaInfo.getNonProdSharedConsumption().newBuilder())
-            .setNonProdDedicatedConsumption(
-                quotaInfo.getNonProdDedicatedConsumption().newBuilder());
-
-        return ok(Result.getQuotaResult(result));
-      }
+    return storage.read(storeProvider -> {
+      QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole, 
storeProvider);
+      GetQuotaResult result = new 
GetQuotaResult(quotaInfo.getQuota().newBuilder())
+          
.setProdSharedConsumption(quotaInfo.getProdSharedConsumption().newBuilder())
+          
.setProdDedicatedConsumption(quotaInfo.getProdDedicatedConsumption().newBuilder())
+          
.setNonProdSharedConsumption(quotaInfo.getNonProdSharedConsumption().newBuilder())
+          .setNonProdDedicatedConsumption(
+              quotaInfo.getNonProdDedicatedConsumption().newBuilder());
+
+      return ok(Result.getQuotaResult(result));
     });
   }
 
@@ -324,28 +292,20 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
   }
 
   @Override
-  public Response getJobUpdateSummaries(final JobUpdateQuery mutableQuery) {
-    final IJobUpdateQuery query = 
IJobUpdateQuery.build(requireNonNull(mutableQuery));
+  public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) {
+    IJobUpdateQuery query = 
IJobUpdateQuery.build(requireNonNull(mutableQuery));
     return ok(Result.getJobUpdateSummariesResult(
-        new 
GetJobUpdateSummariesResult().setUpdateSummaries(IJobUpdateSummary.toBuildersList(
-            storage.read(new Quiet<List<IJobUpdateSummary>>() {
-              @Override
-              public List<IJobUpdateSummary> apply(StoreProvider 
storeProvider) {
-                return 
storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query);
-              }
-            })))));
+        new GetJobUpdateSummariesResult()
+            .setUpdateSummaries(IJobUpdateSummary.toBuildersList(storage.read(
+                storeProvider ->
+                    
storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query))))));
   }
 
   @Override
   public Response getJobUpdateDetails(JobUpdateKey mutableKey) {
-    final IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
+    IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
     Optional<IJobUpdateDetails> details =
-        storage.read(new Quiet<Optional<IJobUpdateDetails>>() {
-          @Override
-          public Optional<IJobUpdateDetails> apply(StoreProvider 
storeProvider) {
-            return 
storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key);
-          }
-        });
+        storage.read(storeProvider -> 
storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key));
 
     if (details.isPresent()) {
       return ok(Result.getJobUpdateDetailsResult(
@@ -357,43 +317,40 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
 
   @Override
   public Response getJobUpdateDiff(JobUpdateRequest mutableRequest) {
-    final IJobUpdateRequest request = 
IJobUpdateRequest.build(requireNonNull(mutableRequest));
-    final IJobKey job = request.getTaskConfig().getJob();
-
-    return storage.read(new Quiet<Response>() {
-      @Override
-      public Response apply(StoreProvider storeProvider) throws 
RuntimeException {
-        if (storeProvider.getCronJobStore().fetchJob(job).isPresent()) {
-          return invalidRequest(NO_CRON);
-        }
-
-        JobDiff diff = JobDiff.compute(
-            storeProvider.getTaskStore(),
-            job,
-            JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()),
-            request.getSettings().getUpdateOnlyTheseInstances());
-
-        Map<Integer, ITaskConfig> replaced = diff.getReplacedInstances();
-        Map<Integer, ITaskConfig> replacements = Maps.asMap(
-            diff.getReplacementInstances(),
-            Functions.constant(request.getTaskConfig()));
-
-        Map<Integer, ITaskConfig> add = Maps.filterKeys(
-            replacements,
-            Predicates.in(Sets.difference(replacements.keySet(), 
replaced.keySet())));
-        Map<Integer, ITaskConfig> remove = Maps.filterKeys(
-            replaced,
-            Predicates.in(Sets.difference(replaced.keySet(), 
replacements.keySet())));
-        Map<Integer, ITaskConfig> update = Maps.filterKeys(
-            replaced,
-            Predicates.in(Sets.intersection(replaced.keySet(), 
replacements.keySet())));
-
-        return ok(Result.getJobUpdateDiffResult(new GetJobUpdateDiffResult()
-            .setAdd(instancesToConfigGroups(add))
-            .setRemove(instancesToConfigGroups(remove))
-            .setUpdate(instancesToConfigGroups(update))
-            
.setUnchanged(instancesToConfigGroups(diff.getUnchangedInstances()))));
+    IJobUpdateRequest request = 
IJobUpdateRequest.build(requireNonNull(mutableRequest));
+    IJobKey job = request.getTaskConfig().getJob();
+
+    return storage.read(storeProvider -> {
+      if (storeProvider.getCronJobStore().fetchJob(job).isPresent()) {
+        return invalidRequest(NO_CRON);
       }
+
+      JobDiff diff = JobDiff.compute(
+          storeProvider.getTaskStore(),
+          job,
+          JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()),
+          request.getSettings().getUpdateOnlyTheseInstances());
+
+      Map<Integer, ITaskConfig> replaced = diff.getReplacedInstances();
+      Map<Integer, ITaskConfig> replacements = Maps.asMap(
+          diff.getReplacementInstances(),
+          Functions.constant(request.getTaskConfig()));
+
+      Map<Integer, ITaskConfig> add = Maps.filterKeys(
+          replacements,
+          Predicates.in(Sets.difference(replacements.keySet(), 
replaced.keySet())));
+      Map<Integer, ITaskConfig> remove = Maps.filterKeys(
+          replaced,
+          Predicates.in(Sets.difference(replaced.keySet(), 
replacements.keySet())));
+      Map<Integer, ITaskConfig> update = Maps.filterKeys(
+          replaced,
+          Predicates.in(Sets.intersection(replaced.keySet(), 
replacements.keySet())));
+
+      return ok(Result.getJobUpdateDiffResult(new GetJobUpdateDiffResult()
+          .setAdd(instancesToConfigGroups(add))
+          .setRemove(instancesToConfigGroups(remove))
+          .setUpdate(instancesToConfigGroups(update))
+          
.setUnchanged(instancesToConfigGroups(diff.getUnchangedInstances()))));
     });
   }
 
@@ -435,23 +392,18 @@ class ReadOnlySchedulerImpl implements 
ReadOnlyScheduler.Iface {
     Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap();
 
     jobs.putAll(Maps.transformEntries(tasks.asMap(),
-        new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, 
IJobConfiguration>() {
-          @Override
-          public IJobConfiguration transformEntry(
-              IJobKey jobKey,
-              Collection<IScheduledTask> tasks) {
-
-            // Pick the latest transitioned task for each immediate job since 
the job can be in the
-            // middle of an update or some shards have been selectively 
created.
-            TaskConfig mostRecentTaskConfig =
-                
Tasks.getLatestActiveTask(tasks).getAssignedTask().getTask().newBuilder();
-
-            return IJobConfiguration.build(new JobConfiguration()
-                .setKey(jobKey.newBuilder())
-                .setOwner(mostRecentTaskConfig.getOwner())
-                .setTaskConfig(mostRecentTaskConfig)
-                .setInstanceCount(tasks.size()));
-          }
+        (jobKey, tasks1) -> {
+
+          // Pick the latest transitioned task for each immediate job since 
the job can be in the
+          // middle of an update or some shards have been selectively created.
+          TaskConfig mostRecentTaskConfig =
+              
Tasks.getLatestActiveTask(tasks1).getAssignedTask().getTask().newBuilder();
+
+          return IJobConfiguration.build(new JobConfiguration()
+              .setKey(jobKey.newBuilder())
+              .setOwner(mostRecentTaskConfig.getOwner())
+              .setTaskConfig(mostRecentTaskConfig)
+              .setInstanceCount(tasks1.size()));
         }));
 
     // Get cron jobs directly from the manager. Do this after querying the 
task store so the real

Reply via email to