Repository: aurora
Updated Branches:
  refs/heads/master b9ede2f7a -> 73e02c06b


Use LockStore only for backwards compatibility

Enter backwards compatibility mode for LockStore.  This means we will restore
and acquire locks as before, but will ignore them otherwise.
Following the next release, `LockStore` will be removed.

Please note that `JobUpdateController` already provides the one-at-a-time
update semantic in addition to using the legacy lock system for the same
purpose.

Reviewed at https://reviews.apache.org/r/63130/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/73e02c06
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/73e02c06
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/73e02c06

Branch: refs/heads/master
Commit: 73e02c06b3accdd0a0e0fcdadf1ec1b46898767f
Parents: b9ede2f
Author: Bill Farner <[email protected]>
Authored: Thu Oct 19 14:25:30 2017 -0700
Committer: Bill Farner <[email protected]>
Committed: Thu Oct 19 14:25:30 2017 -0700

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/api.thrift     |   4 +-
 .../aurora/scheduler/state/LockManager.java     |  29 ++---
 .../aurora/scheduler/state/LockManagerImpl.java |  28 ++---
 .../scheduler/storage/JobUpdateStore.java       |   8 --
 .../scheduler/storage/db/DbJobUpdateStore.java  |   9 --
 .../storage/db/JobUpdateDetailsMapper.java      |   9 --
 .../storage/log/WriteAheadStorage.java          |   5 -
 .../thrift/SchedulerThriftInterface.java        |  48 ++++----
 .../scheduler/updater/JobUpdateController.java  |  19 ++++
 .../updater/JobUpdateControllerImpl.java        |  47 +++-----
 .../python/apache/aurora/client/cli/context.py  |   6 +-
 .../storage/db/JobUpdateDetailsMapper.xml       |   9 --
 .../scheduler/state/LockManagerImplTest.java    |  81 ++------------
 .../storage/db/DbJobUpdateStoreTest.java        |  35 ------
 .../thrift/SchedulerThriftInterfaceTest.java    | 110 +++++++++----------
 .../aurora/scheduler/updater/JobUpdaterIT.java  |  34 +++---
 .../apache/aurora/client/cli/test_create.py     |   2 +-
 .../apache/aurora/client/cli/test_kill.py       |   8 +-
 .../apache/aurora/client/cli/test_supdate.py    |   2 +-
 .../python/apache/aurora/client/cli/util.py     |   3 +-
 .../python/apache/aurora/client/test_base.py    |   4 +-
 21 files changed, 160 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift 
b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index db2220d..c869493 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -27,8 +27,8 @@ enum ResponseCode {
   ERROR           = 2,
   WARNING         = 3,
   AUTH_FAILED     = 4,
-  /** Raised when a Lock-protected operation failed due to lock validation. */
-  LOCK_ERROR      = 5,
+  /** Raised when an operation was unable to proceed due to an in-progress job 
update. */
+  JOB_UPDATING_ERROR = 5,
   /** Raised when a scheduler is transiently unavailable and later retry is 
recommended. */
   ERROR_TRANSIENT = 6
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java 
b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
index 2723306..1a65b08 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java
@@ -13,46 +13,31 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
 
 /**
  * Defines all {@link ILock} primitives like: acquire, release, validate.
  */
 public interface LockManager {
   /**
-   * Creates, saves and returns a new {@link ILock} with the specified {@link 
ILockKey}.
+   * Creates, saves and returns a new {@link ILock} with the specified {@link 
IJobKey}.
    * This method is not re-entrant, i.e. attempting to acquire a lock with the
    * same key would throw a {@link LockException}.
    *
-   * @param lockKey A key uniquely identify the lock to be created.
+   * @param job The job being locked.
    * @param user Name of the user requesting a lock.
    * @return A new ILock instance.
    * @throws LockException In case the lock with specified key already exists.
    */
-  ILock acquireLock(ILockKey lockKey, String user) throws LockException;
+  ILock acquireLock(IJobKey job, String user) throws LockException;
 
   /**
-   * Releases (removes) the specified {@link ILock} from the system.
+   * Releases (removes) the lock associated with {@code job} from the system.
    *
-   * @param lock {@link ILock} to remove from the system.
+   * @param job the job to unlock.
    */
-  void releaseLock(ILock lock);
-
-  /**
-   * Asserts that an entity is not locked.
-   *
-   * @param context Operation context to validate with the provided lock.
-   * @throws LockException If provided context is locked.
-   */
-  void assertNotLocked(ILockKey context) throws LockException;
-
-  /**
-   * Returns all available locks stored.
-   *
-   * @return Set of {@link ILock} instances.
-   */
-  Iterable<ILock> getLocks();
+  void releaseLock(IJobKey job);
 
   /**
    * Thrown when {@link ILock} related operation failed.

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index 632d256..ec05f50 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -22,11 +22,13 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.LockKey._Fields;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 
@@ -50,10 +52,11 @@ public class LockManagerImpl implements LockManager {
   }
 
   @Override
-  public ILock acquireLock(final ILockKey lockKey, final String user) throws 
LockException {
+  public ILock acquireLock(IJobKey job, final String user) throws 
LockException {
     return storage.write(storeProvider -> {
 
       LockStore.Mutable lockStore = storeProvider.getLockStore();
+      ILockKey lockKey = ILockKey.build(LockKey.job(job.newBuilder()));
       Optional<ILock> existingLock = lockStore.fetchLock(lockKey);
 
       if (existingLock.isPresent()) {
@@ -76,25 +79,10 @@ public class LockManagerImpl implements LockManager {
   }
 
   @Override
-  public void releaseLock(final ILock lock) {
-    storage.write(
-        (NoResult.Quiet) storeProvider -> 
storeProvider.getLockStore().removeLock(lock.getKey()));
-  }
-
-  @Override
-  public void assertNotLocked(final ILockKey context) throws LockException {
-    Optional<ILock> stored = storage.read(
-        storeProvider -> storeProvider.getLockStore().fetchLock(context));
-    if (stored.isPresent()) {
-      throw new LockException(String.format(
-          "Unable to perform operation for %s due to active lock held",
-          formatLockKey(context)));
-    }
-  }
-
-  @Override
-  public Iterable<ILock> getLocks() {
-    return storage.read(storeProvider -> 
storeProvider.getLockStore().fetchLocks());
+  public void releaseLock(IJobKey job) {
+    storage.write((NoResult.Quiet) storeProvider -> {
+      
storeProvider.getLockStore().removeLock(ILockKey.build(LockKey.job(job.newBuilder())));
+    });
   }
 
   private static String formatLockKey(ILockKey lockKey) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java 
b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index 52c3c66..254c9b7 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -83,14 +83,6 @@ public interface JobUpdateStore {
   Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails();
 
   /**
-   * Gets the lock token associated with a job update.
-   *
-   * @param key Update identifier.
-   * @return the token associated with the update id, if it exists.
-   */
-  Optional<String> getLockToken(IJobUpdateKey key);
-
-  /**
    * Fetches the events that have affected an instance within a job update.
    *
    * @param key Update identifier.

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java 
b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
index cbe5a0d..df37fb7 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
@@ -269,15 +269,6 @@ public class DbJobUpdateStore implements 
JobUpdateStore.Mutable {
         .toSet();
   }
 
-  @Timed("job_update_store_get_lock_token")
-  @Override
-  public Optional<String> getLockToken(IJobUpdateKey key) {
-    // We assume here that cascading deletes will cause a lock-update 
associative row to disappear
-    // when the lock is invalidated.  This further assumes that a lock row is 
deleted when a lock
-    // is no longer valid.
-    return Optional.fromNullable(detailsMapper.selectLockToken(key));
-  }
-
   @Timed("job_update_store_fetch_instance_events")
   @Override
   public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, 
int instanceId) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
 
b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
index 222ac2d..91053ef 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
@@ -198,15 +198,6 @@ interface JobUpdateDetailsMapper {
   Set<DbStoredJobUpdateDetails> selectAllDetails();
 
   /**
-   * Gets the token associated with an update.
-   *
-   * @param key Update identifier.
-   * @return The associated lock token, or {@code null} if no association 
exists.
-   */
-  @Nullable
-  String selectLockToken(@Param("key") IJobUpdateKey key);
-
-  /**
    * Gets job instance update events for a specific instance within an update.
    *
    * @param key Update identifier.

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java 
b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index d145259..170f9ff 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -457,9 +457,4 @@ class WriteAheadStorage implements
   public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, 
int instanceId) {
     return this.jobUpdateStore.fetchInstanceEvents(key, instanceId);
   }
-
-  @Override
-  public Optional<String> getLockToken(IJobUpdateKey key) {
-    return this.jobUpdateStore.getLockToken(key);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
 
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index b7d9281..534ae59 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -54,7 +54,6 @@ import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.ListBackupsResult;
-import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.MaintenanceStatusResult;
 import org.apache.aurora.gen.PulseJobUpdateResult;
 import org.apache.aurora.gen.QueryRecoveryResult;
@@ -80,8 +79,6 @@ import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
-import org.apache.aurora.scheduler.state.LockManager;
-import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -98,7 +95,6 @@ import 
org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IMetadata;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -110,6 +106,7 @@ import 
org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
 import org.apache.aurora.scheduler.updater.JobDiff;
 import org.apache.aurora.scheduler.updater.JobUpdateController;
 import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
+import 
org.apache.aurora.scheduler.updater.JobUpdateController.JobUpdatingException;
 import org.apache.aurora.scheduler.updater.UpdateInProgressException;
 import org.apache.aurora.scheduler.updater.UpdateStateException;
 import org.apache.thrift.TException;
@@ -120,7 +117,7 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank;
 import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST;
-import static org.apache.aurora.gen.ResponseCode.LOCK_ERROR;
+import static org.apache.aurora.gen.ResponseCode.JOB_UPDATING_ERROR;
 import static org.apache.aurora.gen.ResponseCode.OK;
 import static org.apache.aurora.scheduler.base.Numbers.convertRanges;
 import static org.apache.aurora.scheduler.base.Numbers.toRanges;
@@ -170,7 +167,6 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
   private final ConfigurationManager configurationManager;
   private final Thresholds thresholds;
   private final NonVolatileStorage storage;
-  private final LockManager lockManager;
   private final StorageBackup backup;
   private final Recovery recovery;
   private final MaintenanceController maintenance;
@@ -199,7 +195,6 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
       ConfigurationManager configurationManager,
       Thresholds thresholds,
       NonVolatileStorage storage,
-      LockManager lockManager,
       StorageBackup backup,
       Recovery recovery,
       CronJobManager cronJobManager,
@@ -216,7 +211,6 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
     this.configurationManager = requireNonNull(configurationManager);
     this.thresholds = requireNonNull(thresholds);
     this.storage = requireNonNull(storage);
-    this.lockManager = requireNonNull(lockManager);
     this.backup = requireNonNull(backup);
     this.recovery = requireNonNull(recovery);
     this.maintenance = requireNonNull(maintenance);
@@ -260,7 +254,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
       IJobConfiguration job = sanitized.getJobConfig();
 
       try {
-        
lockManager.assertNotLocked(ILockKey.build(LockKey.job(job.getKey().newBuilder())));
+        jobUpdateController.assertNotUpdating(job.getKey());
 
         checkJobExists(storeProvider, job.getKey());
 
@@ -279,8 +273,8 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
         createJobCounter.addAndGet(sanitized.getInstanceIds().size());
 
         return ok();
-      } catch (LockException e) {
-        return error(LOCK_ERROR, e);
+      } catch (JobUpdatingException e) {
+        return error(JOB_UPDATING_ERROR, e);
       } catch (JobExistsException | TaskValidationException e) {
         return error(INVALID_REQUEST, e);
       }
@@ -321,7 +315,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
 
     return storage.write(storeProvider -> {
       try {
-        
lockManager.assertNotLocked(ILockKey.build(LockKey.job(jobKey.newBuilder())));
+        jobUpdateController.assertNotUpdating(jobKey);
 
         int count = sanitized.getJobConfig().getInstanceCount();
 
@@ -340,8 +334,8 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
         createOrUpdateCronCounter.addAndGet(count);
 
         return ok();
-      } catch (LockException e) {
-        return error(LOCK_ERROR, e);
+      } catch (JobUpdatingException e) {
+        return error(JOB_UPDATING_ERROR, e);
       } catch (JobExistsException | TaskValidationException | CronException e) 
{
         return error(INVALID_REQUEST, e);
       }
@@ -362,15 +356,15 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
   public Response descheduleCronJob(JobKey mutableJobKey) {
     try {
       IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
-      
lockManager.assertNotLocked(ILockKey.build(LockKey.job(jobKey.newBuilder())));
+      jobUpdateController.assertNotUpdating(jobKey);
 
       if (cronJobManager.deleteJob(jobKey)) {
         return ok();
       } else {
         return addMessage(empty(), OK, notScheduledCronMessage(jobKey));
       }
-    } catch (LockException e) {
-      return error(LOCK_ERROR, e);
+    } catch (JobUpdatingException e) {
+      return error(JOB_UPDATING_ERROR, e);
     }
   }
 
@@ -444,14 +438,14 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
     return readOnlyScheduler.getTierConfigs();
   }
 
-  private void validateLockForTasks(Iterable<IScheduledTask> tasks) throws 
LockException {
+  private void validateLockForTasks(Iterable<IScheduledTask> tasks) throws 
JobUpdatingException {
     ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks)
         .transform(Tasks::getJob)
         .toSet();
 
     // Validate lock against every unique job key derived from the tasks.
     for (IJobKey key : uniqueKeys) {
-      
lockManager.assertNotLocked(ILockKey.build(LockKey.job(key.newBuilder())));
+      jobUpdateController.assertNotUpdating(key);
     }
   }
 
@@ -479,8 +473,8 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
       Iterable<IScheduledTask> tasks = 
storeProvider.getTaskStore().fetchTasks(query);
       try {
         validateLockForTasks(tasks);
-      } catch (LockException e) {
-        return error(LOCK_ERROR, e);
+      } catch (JobUpdatingException e) {
+        return error(JOB_UPDATING_ERROR, e);
       }
 
       LOG.info("Killing tasks matching " + query);
@@ -511,9 +505,9 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
 
     return storage.write(storeProvider -> {
       try {
-        
lockManager.assertNotLocked(ILockKey.build(LockKey.job(jobKey.newBuilder())));
-      } catch (LockException e) {
-        return error(LOCK_ERROR, e);
+        jobUpdateController.assertNotUpdating(jobKey);
+      } catch (JobUpdatingException e) {
+        return error(JOB_UPDATING_ERROR, e);
       }
 
       Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
@@ -691,7 +685,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
           return invalidRequest("Instances may not be added to cron jobs.");
         }
 
-        
lockManager.assertNotLocked(ILockKey.build(LockKey.job(jobKey.newBuilder())));
+        jobUpdateController.assertNotUpdating(jobKey);
 
         FluentIterable<IScheduledTask> currentTasks = FluentIterable.from(
             
storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()));
@@ -726,8 +720,8 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
         addInstancesCounter.addAndGet(instanceIds.size());
 
         return response.setResponseCode(OK);
-      } catch (LockException e) {
-        return error(LOCK_ERROR, e);
+      } catch (JobUpdatingException e) {
+        return error(JOB_UPDATING_ERROR, e);
       } catch (TaskValidationException | IllegalArgumentException e) {
         return error(INVALID_REQUEST, e);
       }

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index c2ec1b3..9159d69 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.aurora.gen.JobUpdatePulseStatus;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -87,6 +88,24 @@ public interface JobUpdateController {
   void start(IJobUpdate update, AuditData auditData) throws 
UpdateStateException;
 
   /**
+   * Thrown when {@link #assertNotUpdating(IJobKey)} is called and a job was 
updating.
+   */
+  class JobUpdatingException extends Exception {
+    public JobUpdatingException(String msg) {
+      super(msg);
+    }
+  }
+
+  /**
+   * Indicates whether a job is actively updating.  Note that this may include 
'paused' update
+   * states.
+   *
+   * @param job Job to check.
+   * @throws JobUpdatingException if the job is actively updating.
+   */
+  void assertNotUpdating(IJobKey job) throws JobUpdatingException;
+
+  /**
    * Pauses an in-progress update.
    * <p>
    * A paused update may be resumed by invoking {@link #resume(IJobUpdateKey, 
AuditData)}.

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 5420235..3d35e97 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -43,8 +43,6 @@ import org.apache.aurora.gen.JobUpdateEvent;
 import org.apache.aurora.gen.JobUpdatePulseStatus;
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.Lock;
-import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.scheduler.BatchWorker;
 import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.base.InstanceKeys;
@@ -68,7 +66,6 @@ import 
org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.updater.StateEvaluator.Failure;
 import org.slf4j.Logger;
@@ -194,9 +191,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
       LOG.info("Starting update for job " + job);
       ILock lock;
       try {
-        lock = lockManager.acquireLock(
-            ILockKey.build(LockKey.job(job.newBuilder())),
-            auditData.getUser());
+        lock = lockManager.acquireLock(job, auditData.getUser());
       } catch (LockException e) {
         throw new UpdateStateException(e.getMessage(), e);
       }
@@ -219,6 +214,17 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
   }
 
   @Override
+  public void assertNotUpdating(IJobKey job) throws JobUpdatingException {
+    requireNonNull(job);
+
+    if (storage.read(p -> !p.getJobUpdateStore()
+        .fetchJobUpdateSummaries(queryActiveByJob(job)).isEmpty())) {
+
+      throw new JobUpdatingException("Job is currently updating");
+    }
+  }
+
+  @Override
   public void pause(final IJobUpdateKey key, AuditData auditData) throws 
UpdateStateException {
     requireNonNull(key);
     LOG.info("Attempting to pause update " + key);
@@ -476,21 +482,10 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
       MutableStoreProvider storeProvider,
       IJobUpdateKey key,
       JobUpdateEvent proposedEvent,
-      boolean recordChange) throws UpdateStateException {
-
-    JobUpdateStatus status;
-    boolean record;
+      boolean record) throws UpdateStateException {
 
+    JobUpdateStatus status = proposedEvent.getStatus();
     JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
-    Optional<String> updateLock = updateStore.getLockToken(key);
-    if (updateLock.isPresent()) {
-      status = proposedEvent.getStatus();
-      record = recordChange;
-    } else {
-      LOG.error("Update " + key + " does not have a lock");
-      status = ERROR;
-      record = true;
-    }
 
     LOG.info("Update {} is now in state {}", key, status);
     if (record) {
@@ -500,12 +495,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
     }
 
     if (TERMINAL_STATES.contains(status)) {
-      if (updateLock.isPresent()) {
-        lockManager.releaseLock(ILock.build(new Lock()
-            .setKey(LockKey.job(key.getJob().newBuilder()))
-            .setToken(updateLock.get())));
-      }
-
+      lockManager.releaseLock(key.getJob());
       pulseHandler.remove(key);
     } else {
       pulseHandler.updatePulseStatus(key, status);
@@ -590,13 +580,6 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
     final IJobUpdateKey key = summary.getKey();
 
     JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
-    if (!updateStore.getLockToken(key).isPresent()) {
-      recordAndChangeJobUpdateStatus(
-          storeProvider,
-          key,
-          newEvent(ERROR).setMessage(LOST_LOCK_MESSAGE));
-      return;
-    }
 
     IJobUpdateInstructions instructions = 
updateStore.fetchJobUpdateInstructions(key).get();
     if (isCoordinatedAndPulseExpired(key, instructions)) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/python/apache/aurora/client/cli/context.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/context.py 
b/src/main/python/apache/aurora/client/cli/context.py
index 9cf5839..06b1941 100644
--- a/src/main/python/apache/aurora/client/cli/context.py
+++ b/src/main/python/apache/aurora/client/cli/context.py
@@ -73,7 +73,7 @@ def add_auth_error_handler(api):
 
 class AuroraCommandContext(Context):
 
-  LOCK_ERROR_MSG = """Error: job is locked by an active update.
+  JOB_UPDATING_ERROR_MSG = """Error: job is locked by an active update.
       Run 'aurora update abort' or wait for the active update to finish."""
 
   """A context object used by Aurora commands to manage command processing 
state
@@ -109,8 +109,8 @@ class AuroraCommandContext(Context):
     else:
       self.print_err(err_msg)
       self.print_err("\t%s" % combine_messages(resp))
-      if resp.responseCode == ResponseCode.LOCK_ERROR:
-        self.print_err("\t%s" % self.LOCK_ERROR_MSG)
+      if resp.responseCode == ResponseCode.JOB_UPDATING_ERROR:
+        self.print_err("\t%s" % self.JOB_UPDATING_ERROR_MSG)
       raise self.CommandErrorLogged(err_code, err_msg)
 
   def _get_tier_configurations(self, cluster):

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
----------------------------------------------------------------------
diff --git 
a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
 
b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
index f56ad67..aded483 100644
--- 
a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
+++ 
b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
@@ -446,15 +446,6 @@
     <include refid="unscoped_details_select"/>
   </select>
 
-  <select id="selectLockToken" resultType="String">
-    SELECT
-      lock_token
-    FROM job_update_locks AS l
-    INNER JOIN job_updates u ON l.update_row_id = u.id
-    <include refid="job_key_inner_join" />
-    WHERE <include refid="filter_by_update_key"/>
-  </select>
-
   <select id="selectInstanceConfigs" resultMap="instanceConfigMap">
     SELECT
       co.id,

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index 19f9de3..c56b6f9 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -13,13 +13,7 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -32,15 +26,11 @@ import 
org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import static org.apache.aurora.scheduler.storage.Storage.Work;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
@@ -48,11 +38,8 @@ public class LockManagerImplTest extends EasyMockTest {
   private static final String USER = "jim-user";
   private static final String MY_JOB = "myJob";
   private static final IJobKey JOB_KEY = JobKeys.from("jim", "devel", MY_JOB);
-  private static final ILockKey LOCK_KEY = 
ILockKey.build(LockKey.job(JOB_KEY.newBuilder()));
   private static final UUID TOKEN = 
UUID.fromString("79d6d790-3212-11e3-aa6e-0800200c9a66");
 
-  private FakeClock clock;
-  private UUIDGenerator tokenGenerator;
   private LockManager lockManager;
   private long timestampMs;
 
@@ -61,11 +48,11 @@ public class LockManagerImplTest extends EasyMockTest {
 
   @Before
   public void setUp() throws Exception {
-    clock = new FakeClock();
+    FakeClock clock = new FakeClock();
     clock.advance(Amount.of(12345L, Time.SECONDS));
     timestampMs = clock.nowMillis();
 
-    tokenGenerator = createMock(UUIDGenerator.class);
+    UUIDGenerator tokenGenerator = createMock(UUIDGenerator.class);
     expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes();
 
     lockManager = new LockManagerImpl(DbUtil.createStorage(), clock, 
tokenGenerator);
@@ -76,12 +63,12 @@ public class LockManagerImplTest extends EasyMockTest {
     control.replay();
 
     ILock expected = ILock.build(new Lock()
-        .setKey(LOCK_KEY.newBuilder())
+        .setKey(LockKey.job(JOB_KEY.newBuilder()))
         .setToken(TOKEN.toString())
         .setTimestampMs(timestampMs)
         .setUser(USER));
 
-    ILock actual = lockManager.acquireLock(expected.getKey(), USER);
+    ILock actual = lockManager.acquireLock(JOB_KEY, USER);
     assertEquals(expected, actual);
   }
 
@@ -90,69 +77,19 @@ public class LockManagerImplTest extends EasyMockTest {
     control.replay();
 
     expectLockException(JOB_KEY);
-    lockManager.acquireLock(LOCK_KEY, USER);
-    lockManager.acquireLock(LOCK_KEY, USER);
+    lockManager.acquireLock(JOB_KEY, USER);
+    lockManager.acquireLock(JOB_KEY, USER);
   }
 
   @Test
   public void testReleaseLock() throws Exception {
     control.replay();
 
-    ILock lock = lockManager.acquireLock(LOCK_KEY, USER);
-    lockManager.releaseLock(lock);
+    lockManager.acquireLock(JOB_KEY, USER);
+    lockManager.releaseLock(JOB_KEY);
 
     // Should be able to lock again after releasing.
-    lockManager.acquireLock(LOCK_KEY, USER);
-  }
-
-  @Test
-  public void testGetLocks() throws Exception {
-    control.replay();
-
-    ILock lock = lockManager.acquireLock(LOCK_KEY, USER);
-    assertEquals(lock, Iterables.getOnlyElement(lockManager.getLocks()));
-  }
-
-  // Test for regression of AURORA-702.
-  @Test
-  public void testNoDeadlock() throws Exception {
-    final StorageTestUtil storageUtil = new StorageTestUtil(this);
-
-    
expect(storageUtil.storeProvider.getLockStore()).andReturn(storageUtil.lockStore).atLeastOnce();
-    expect(storageUtil.lockStore.fetchLock(LOCK_KEY))
-        .andReturn(Optional.empty())
-        .atLeastOnce();
-
-    final CountDownLatch reads = new CountDownLatch(2);
-    EasyMock.makeThreadSafe(storageUtil.storage, false);
-    expect(storageUtil.storage.read(EasyMock.anyObject()))
-        .andAnswer(() -> {
-          @SuppressWarnings("unchecked")
-          Work<?, ?> work = (Work<?, ?>) EasyMock.getCurrentArguments()[0];
-          Object result = work.apply(storageUtil.storeProvider);
-          reads.countDown();
-          reads.await();
-          return result;
-        }).atLeastOnce();
-
-    lockManager = new LockManagerImpl(storageUtil.storage, clock, 
tokenGenerator);
-
-    control.replay();
-
-    new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("LockRead-%s")
-        .build()
-        .newThread(() -> {
-          try {
-            lockManager.assertNotLocked(LOCK_KEY);
-          } catch (LockException e) {
-            throw Throwables.propagate(e);
-          }
-        })
-        .start();
-
-    lockManager.assertNotLocked(LOCK_KEY);
+    lockManager.acquireLock(JOB_KEY, USER);
   }
 
   private void expectLockException(IJobKey key) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java
 
b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java
index 8fca54b..453c039 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java
@@ -92,8 +92,6 @@ public class DbJobUpdateStoreTest {
   private static final IJobKey JOB = JobKeys.from("testRole", "testEnv", 
"job");
   private static final IJobUpdateKey UPDATE1 =
       IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update1"));
-  private static final IJobUpdateKey UPDATE2 = IJobUpdateKey.build(
-      new JobUpdateKey(JobKeys.from("testRole", "testEnv", 
"job2").newBuilder(), "update2"));
   private static final long CREATED_MS = 111L;
   private static final IJobUpdateEvent FIRST_EVENT =
       makeJobUpdateEvent(ROLLING_FORWARD, CREATED_MS);
@@ -659,39 +657,6 @@ public class DbJobUpdateStoreTest {
     saveUpdate(makeJobUpdate(makeKey("update2")), Optional.of("lock2"));
   }
 
-  private static final Optional<String> NO_TOKEN = Optional.absent();
-
-  @Test
-  public void testGetLockToken() {
-    storage.write((NoResult.Quiet) storeProvider -> {
-      IJobUpdate update1 = makeJobUpdate(UPDATE1);
-      IJobUpdate update2 = makeJobUpdate(UPDATE2);
-      saveUpdate(update1, Optional.of("lock1"));
-      assertEquals(
-          Optional.of("lock1"),
-          storeProvider.getJobUpdateStore().getLockToken(UPDATE1));
-      assertEquals(NO_TOKEN, 
storeProvider.getJobUpdateStore().getLockToken(UPDATE2));
-
-      saveUpdate(update2, Optional.of("lock2"));
-      assertEquals(
-          Optional.of("lock1"),
-          storeProvider.getJobUpdateStore().getLockToken(UPDATE1));
-      assertEquals(
-          Optional.of("lock2"),
-          storeProvider.getJobUpdateStore().getLockToken(UPDATE2));
-
-      storeProvider.getLockStore().removeLock(makeLock(update1, 
"lock1").getKey());
-      assertEquals(NO_TOKEN, 
storeProvider.getJobUpdateStore().getLockToken(UPDATE1));
-      assertEquals(
-          Optional.of("lock2"),
-          storeProvider.getJobUpdateStore().getLockToken(UPDATE2));
-
-      storeProvider.getLockStore().removeLock(makeLock(update2, 
"lock2").getKey());
-      assertEquals(NO_TOKEN, 
storeProvider.getJobUpdateStore().getLockToken(UPDATE1));
-      assertEquals(NO_TOKEN, 
storeProvider.getJobUpdateStore().getLockToken(UPDATE2));
-    });
-  }
-
   @Test
   public void testGetSummaries() {
     String role1 = "role1";

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index b48477a..1691477 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -47,7 +47,6 @@ import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.ListBackupsResult;
-import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.MesosContainer;
 import org.apache.aurora.gen.Metadata;
@@ -69,7 +68,6 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.gen.apiConstants;
-import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -82,8 +80,6 @@ import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
-import org.apache.aurora.scheduler.state.LockManager;
-import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -94,7 +90,6 @@ import 
org.apache.aurora.scheduler.storage.backup.StorageBackup;
 import org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IMetadata;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -104,6 +99,7 @@ import 
org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.aurora.scheduler.updater.JobUpdateController;
 import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
+import 
org.apache.aurora.scheduler.updater.JobUpdateController.JobUpdatingException;
 import org.apache.aurora.scheduler.updater.UpdateInProgressException;
 import org.apache.aurora.scheduler.updater.UpdateStateException;
 import org.apache.thrift.TException;
@@ -119,7 +115,7 @@ import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
 import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST;
-import static org.apache.aurora.gen.ResponseCode.LOCK_ERROR;
+import static org.apache.aurora.gen.ResponseCode.JOB_UPDATING_ERROR;
 import static org.apache.aurora.gen.ResponseCode.OK;
 import static 
org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static 
org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryException;
@@ -130,7 +126,6 @@ import static 
org.apache.aurora.scheduler.thrift.Fixtures.INSTANCE_KEY;
 import static org.apache.aurora.scheduler.thrift.Fixtures.INVALID_TASK_CONFIG;
 import static org.apache.aurora.scheduler.thrift.Fixtures.JOB_KEY;
 import static org.apache.aurora.scheduler.thrift.Fixtures.JOB_NAME;
-import static org.apache.aurora.scheduler.thrift.Fixtures.LOCK_KEY;
 import static org.apache.aurora.scheduler.thrift.Fixtures.NOT_ENOUGH_QUOTA;
 import static org.apache.aurora.scheduler.thrift.Fixtures.ROLE;
 import static org.apache.aurora.scheduler.thrift.Fixtures.TASK_ID;
@@ -181,7 +176,6 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
       ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"));
 
   private StorageTestUtil storageUtil;
-  private LockManager lockManager;
   private StorageBackup backup;
   private Recovery recovery;
   private MaintenanceController maintenance;
@@ -200,7 +194,6 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
-    lockManager = createMock(LockManager.class);
     backup = createMock(StorageBackup.class);
     recovery = createMock(Recovery.class);
     maintenance = createMock(MaintenanceController.class);
@@ -219,7 +212,6 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
             TaskTestUtil.CONFIGURATION_MANAGER,
             THRESHOLDS,
             storageUtil.storage,
-            lockManager,
             backup,
             recovery,
             cronJobManager,
@@ -264,7 +256,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testCreateJobNoLock() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeProdJob());
     SanitizedConfiguration sanitized = fromUnsanitized(job);
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectNoCronJob();
     expectInstanceQuotaCheck(sanitized, ENOUGH_QUOTA);
@@ -284,7 +276,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testCreateJobWithLock() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeProdJob());
     SanitizedConfiguration sanitized = fromUnsanitized(job);
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectNoCronJob();
     expectInstanceQuotaCheck(sanitized, ENOUGH_QUOTA);
@@ -322,19 +314,19 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   @Test
   public void testCreateJobFailsLockCheck() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeJob());
-    lockManager.assertNotLocked(LOCK_KEY);
-    expectLastCall().andThrow(new LockException("Locked"));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    expectLastCall().andThrow(new JobUpdatingException("Job is updating"));
 
     control.replay();
 
-    assertResponse(LOCK_ERROR, thrift.createJob(job.newBuilder()));
+    assertResponse(JOB_UPDATING_ERROR, thrift.createJob(job.newBuilder()));
     assertEquals(0L, statsProvider.getLongValue(CREATE_JOB));
   }
 
   @Test
   public void testCreateJobFailsJobExists() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeJob());
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), 
buildScheduledTask());
 
     control.replay();
@@ -346,7 +338,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   @Test
   public void testCreateJobFailsCronJobExists() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeJob());
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectCronJob();
 
@@ -361,7 +353,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
     IJobConfiguration job = IJobConfiguration.build(
         makeJob(defaultTask(true), THRESHOLDS.getMaxTasksPerJob() + 1));
 
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectNoCronJob();
     expect(quotaManager.checkInstanceAddition(
@@ -379,7 +371,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testCreateJobFailsQuotaCheck() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeProdJob());
     SanitizedConfiguration sanitized = fromUnsanitized(job);
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectNoCronJob();
     expectInstanceQuotaCheck(sanitized, NOT_ENOUGH_QUOTA);
@@ -501,7 +493,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
     IJobConfiguration job = IJobConfiguration.build(makeJob(task));
     SanitizedConfiguration sanitized = fromUnsanitized(job);
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectNoCronJob();
     expectInstanceQuotaCheck(sanitized, ENOUGH_QUOTA);
@@ -543,7 +535,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
         .setMaxTaskFailures(0)
         .setResources(resources);
 
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expectNoCronJob();
     expectInstanceQuotaCheck(ITaskConfig.build(sanitized.getTaskConfig()), 
ENOUGH_QUOTA);
@@ -634,7 +626,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testJobScopedKillsActive() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     storageUtil.expectTaskFetch(query, buildScheduledTask());
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     expectTransitionsToKilling();
 
     control.replay();
@@ -647,7 +639,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testInstanceScoped() throws Exception {
     Query.Builder query = Query.instanceScoped(JOB_KEY, 
ImmutableSet.of(1)).active();
     storageUtil.expectTaskFetch(query, buildScheduledTask());
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     expectTransitionsToKilling();
 
     control.replay();
@@ -660,17 +652,15 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testKillTasksLockCheckFailed() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     IScheduledTask task2 = buildScheduledTask("job_bar", TASK_ID);
-    ILockKey key2 = ILockKey.build(LockKey.job(
-        JobKeys.from(ROLE, "devel", "job_bar").newBuilder()));
     storageUtil.expectTaskFetch(query, buildScheduledTask(), task2);
-    lockManager.assertNotLocked(LOCK_KEY);
-    lockManager.assertNotLocked(key2);
-    expectLastCall().andThrow(new LockException("Failed lock check."));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    
jobUpdateController.assertNotUpdating(task2.getAssignedTask().getTask().getJob());
+    expectLastCall().andThrow(new JobUpdatingException("Job is updating"));
 
     control.replay();
 
     assertResponse(
-        LOCK_ERROR,
+        JOB_UPDATING_ERROR,
         thrift.killTasks(JOB_KEY.newBuilder(), null, null));
     assertEquals(0L, statsProvider.getLongValue(KILL_TASKS));
   }
@@ -693,7 +683,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
     String message = "Test message";
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     storageUtil.expectTaskFetch(query, buildScheduledTask());
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     expectTransitionsToKilling(Optional.of(message));
 
     control.replay();
@@ -879,7 +869,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testRestartShards() throws Exception {
     Set<Integer> shards = ImmutableSet.of(0);
 
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(
         Query.instanceScoped(JOB_KEY, shards).active(),
         buildScheduledTask());
@@ -904,13 +894,13 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testRestartShardsLockCheckFails() throws Exception {
     Set<Integer> shards = ImmutableSet.of(1, 6);
 
-    lockManager.assertNotLocked(LOCK_KEY);
-    expectLastCall().andThrow(new LockException("test"));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    expectLastCall().andThrow(new JobUpdatingException("job is updating"));
 
     control.replay();
 
     assertResponse(
-        LOCK_ERROR,
+        JOB_UPDATING_ERROR,
         thrift.restartShards(JOB_KEY.newBuilder(), shards));
     assertEquals(0L, statsProvider.getLongValue(KILL_TASKS));
   }
@@ -919,7 +909,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void testRestartShardsNotFoundTasksFailure() throws Exception {
     Set<Integer> shards = ImmutableSet.of(1, 6);
 
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.instanceScoped(JOB_KEY, 
shards).active());
 
     control.replay();
@@ -932,7 +922,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testReplaceCronTemplate() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     SanitizedConfiguration sanitized = 
fromUnsanitized(IJobConfiguration.build(CRON_JOB));
 
     expectCronQuotaCheck(sanitized.getJobConfig(), ENOUGH_QUOTA);
@@ -945,17 +935,17 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testReplaceCronTemplateFailedLockValidation() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
-    expectLastCall().andThrow(new LockException("Failed lock."));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    expectLastCall().andThrow(new JobUpdatingException("job is updating"));
     control.replay();
 
-    assertResponse(LOCK_ERROR, thrift.replaceCronTemplate(CRON_JOB));
+    assertResponse(JOB_UPDATING_ERROR, thrift.replaceCronTemplate(CRON_JOB));
     assertEquals(0L, statsProvider.getLongValue(CREATE_OR_UPDATE_CRON));
   }
 
   @Test
   public void testReplaceCronTemplateDoesNotExist() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     SanitizedConfiguration sanitized = 
fromUnsanitized(IJobConfiguration.build(CRON_JOB));
 
     expectCronQuotaCheck(sanitized.getJobConfig(), ENOUGH_QUOTA);
@@ -985,7 +975,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testScheduleCronCreatesJob() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     SanitizedConfiguration sanitized = 
fromUnsanitized(IJobConfiguration.build(CRON_JOB));
 
     expectCronQuotaCheck(sanitized.getJobConfig(), ENOUGH_QUOTA);
@@ -999,7 +989,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testScheduleCronFailsCreationDueToExistingNonCron() throws 
Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     SanitizedConfiguration sanitized = 
fromUnsanitized(IJobConfiguration.build(CRON_JOB));
 
     expectCronQuotaCheck(sanitized.getJobConfig(), ENOUGH_QUOTA);
@@ -1014,7 +1004,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testScheduleCronUpdatesJob() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     SanitizedConfiguration sanitized = 
fromUnsanitized(IJobConfiguration.build(CRON_JOB));
 
     expectCronQuotaCheck(sanitized.getJobConfig(), ENOUGH_QUOTA);
@@ -1035,10 +1025,10 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testScheduleCronJobFailsLockValidation() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
-    expectLastCall().andThrow(new LockException("Failed lock"));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    expectLastCall().andThrow(new JobUpdatingException("job is updating"));
     control.replay();
-    assertResponse(LOCK_ERROR, thrift.scheduleCronJob(CRON_JOB));
+    assertResponse(JOB_UPDATING_ERROR, thrift.scheduleCronJob(CRON_JOB));
   }
 
   @Test
@@ -1052,7 +1042,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testScheduleCronFailsQuotaCheck() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     SanitizedConfiguration sanitized = 
fromUnsanitized(IJobConfiguration.build(CRON_JOB));
 
     expectCronQuotaCheck(sanitized.getJobConfig(), NOT_ENOUGH_QUOTA);
@@ -1063,7 +1053,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testDescheduleCronJob() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(true);
 
     control.replay();
@@ -1073,15 +1063,15 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testDescheduleCronJobFailsLockValidation() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
-    expectLastCall().andThrow(new LockException("Failed lock"));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    expectLastCall().andThrow(new JobUpdatingException("job is updating"));
     control.replay();
-    assertResponse(LOCK_ERROR, thrift.descheduleCronJob(CRON_JOB.getKey()));
+    assertResponse(JOB_UPDATING_ERROR, 
thrift.descheduleCronJob(CRON_JOB.getKey()));
   }
 
   @Test
   public void testDescheduleNotACron() throws Exception {
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(false);
     control.replay();
 
@@ -1243,7 +1233,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   @Test
   public void testAddInstancesWithInstanceKey() throws Exception {
     expectNoCronJob();
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     IScheduledTask activeTask = buildScheduledTask();
     ITaskConfig task = activeTask.getAssignedTask().getTask();
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
@@ -1265,7 +1255,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   @Test
   public void testAddInstancesWithInstanceKeyFailsWithNoInstance() throws 
Exception {
     expectNoCronJob();
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
 
     control.replay();
@@ -1279,7 +1269,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   @Test
   public void testAddInstancesWithInstanceKeyFailsInvalidCount() throws 
Exception {
     expectNoCronJob();
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
 
     control.replay();
@@ -1311,12 +1301,12 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   @Test
   public void testAddInstancesLockCheckFails() throws Exception {
     expectNoCronJob();
-    lockManager.assertNotLocked(LOCK_KEY);
-    expectLastCall().andThrow(new LockException("Failed lock check."));
+    jobUpdateController.assertNotUpdating(JOB_KEY);
+    expectLastCall().andThrow(new JobUpdatingException("job is updating"));
 
     control.replay();
 
-    assertResponse(LOCK_ERROR, thrift.addInstances(INSTANCE_KEY, 1));
+    assertResponse(JOB_UPDATING_ERROR, thrift.addInstances(INSTANCE_KEY, 1));
     assertEquals(0L, statsProvider.getLongValue(ADD_INSTANCES));
   }
 
@@ -1325,7 +1315,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
     IScheduledTask activeTask = buildScheduledTask();
     ITaskConfig task = activeTask.getAssignedTask().getTask();
     expectNoCronJob();
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
     expectInstanceQuotaCheck(task, NOT_ENOUGH_QUOTA);
 
@@ -1340,7 +1330,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
     IScheduledTask activeTask = buildScheduledTask();
     ITaskConfig task = activeTask.getAssignedTask().getTask();
     expectNoCronJob();
-    lockManager.assertNotLocked(LOCK_KEY);
+    jobUpdateController.assertNotUpdating(JOB_KEY);
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
     expectInstanceQuotaCheck(task, ENOUGH_QUOTA);
     stateManager.insertPendingTasks(

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java 
b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 453366e..0a76f53 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -55,7 +55,6 @@ import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateState;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.Metadata;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -93,7 +92,6 @@ import 
org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
@@ -221,18 +219,18 @@ public class JobUpdaterIT extends EasyMockTest {
   @After
   public void validateExitState() {
     clock.assertEmpty();
-    assertEquals(ImmutableList.of(), 
ImmutableList.copyOf(lockManager.getLocks()));
+    assertEquals(ImmutableSet.of(), storage.read(p -> 
p.getLockStore().fetchLocks()));
   }
 
   @Test(expected = UpdateStateException.class)
   public void testJobLocked() throws Exception {
     control.replay();
 
-    ILock lock = 
lockManager.acquireLock(ILockKey.build(LockKey.job(JOB.newBuilder())), USER);
+    lockManager.acquireLock(JOB, USER);
     try {
       updater.start(makeJobUpdate(makeInstanceConfig(0, 0, NEW_CONFIG)), 
AUDIT);
     } finally {
-      lockManager.releaseLock(lock);
+      lockManager.releaseLock(JOB);
     }
   }
 
@@ -1086,13 +1084,17 @@ public class JobUpdaterIT extends EasyMockTest {
   }
 
   private void releaseAllLocks() {
-    for (ILock lock : lockManager.getLocks()) {
-      lockManager.releaseLock(lock);
+    for (ILock lock : storage.read(p -> p.getLockStore().fetchLocks())) {
+      lockManager.releaseLock(lock.getKey().getJob());
     }
   }
 
   @Test
   public void testLostLock() throws Exception {
+    // Validates the 'write-only' nature of locks in relation to job updates.  
We are in backwards
+    // compatibility mode for LockStore, so it is not used to control behavior 
of updates.
+    // As a result, out-of-band removal of locks will not impact the progress 
of job updates.
+
     expectTaskKilled();
 
     control.replay();
@@ -1111,8 +1113,7 @@ public class JobUpdaterIT extends EasyMockTest {
     changeState(JOB, 0, KILLED);
     ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = 
ImmutableMultimap.builder();
     actions.putAll(0, INSTANCE_UPDATING);
-    assertState(ERROR, actions.build());
-    assertLatestUpdateMessage(JobUpdateControllerImpl.LOST_LOCK_MESSAGE);
+    assertState(ROLLING_FORWARD, actions.build());
   }
 
   private void expectInvalid(JobUpdate update)
@@ -1167,9 +1168,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
       JobUpdate builder = update.newBuilder();
       builder.getInstructions().getSettings().setUpdateGroupSize(0);
-      for (ILock lock : lockManager.getLocks()) {
-        lockManager.releaseLock(lock);
-      }
+      releaseAllLocks();
       saveJobUpdate(store, IJobUpdate.build(builder), ROLLING_FORWARD);
     });
 
@@ -1194,8 +1193,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
     ILock lock;
     try {
-      lock = lockManager.acquireLock(
-          
ILockKey.build(LockKey.job(update.getSummary().getKey().getJob().newBuilder())),
 USER);
+      lock = lockManager.acquireLock(update.getSummary().getKey().getJob(), 
USER);
     } catch (LockManager.LockException e) {
       throw Throwables.propagate(e);
     }
@@ -1258,7 +1256,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
     storage.write((NoResult.Quiet) storeProvider -> {
       ILock lock = saveJobUpdate(storeProvider.getJobUpdateStore(), update, 
ROLLING_FORWARD);
-      lockManager.releaseLock(lock);
+      lockManager.releaseLock(lock.getKey().getJob());
     });
 
     subscriber.startAsync().awaitRunning();
@@ -1409,7 +1407,7 @@ public class JobUpdaterIT extends EasyMockTest {
     releaseAllLocks();
     updater.abort(update.getSummary().getKey(), AUDIT);
     clock.advance(WATCH_TIMEOUT);
-    assertState(ERROR, actions.build());
+    assertState(ABORTED, actions.build());
   }
 
   @Test
@@ -1674,9 +1672,9 @@ public class JobUpdaterIT extends EasyMockTest {
       assertEquals(
           inProgress.getSummary().newBuilder().setState(new 
JobUpdateState(ROLLING_FORWARD, 0, 0)),
           e.getInProgressUpdateSummary().newBuilder());
-      assertEquals(ImmutableList.of(lock), 
ImmutableList.copyOf(lockManager.getLocks()));
+      assertEquals(ImmutableSet.of(lock), storage.read(p -> 
p.getLockStore().fetchLocks()));
     } finally {
-      lockManager.releaseLock(lock);
+      lockManager.releaseLock(lock.getKey().getJob());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/python/apache/aurora/client/cli/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_create.py 
b/src/test/python/apache/aurora/client/cli/test_create.py
index e029ada..73e91cf 100644
--- a/src/test/python/apache/aurora/client/cli/test_create.py
+++ b/src/test/python/apache/aurora/client/cli/test_create.py
@@ -67,7 +67,7 @@ class TestCreateJobCommand(AuroraClientCommandTest):
     mock_api = fake_context.get_api("test")
 
     mock_api.create_job.return_value = 
AuroraClientCommandTest.create_blank_response(
-      ResponseCode.LOCK_ERROR, "Error.")
+      ResponseCode.JOB_UPDATING_ERROR, "Error.")
 
     with pytest.raises(Context.CommandError):
       command.execute(fake_context)

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/python/apache/aurora/client/cli/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_kill.py 
b/src/test/python/apache/aurora/client/cli/test_kill.py
index 269b566..0e859dc 100644
--- a/src/test/python/apache/aurora/client/cli/test_kill.py
+++ b/src/test/python/apache/aurora/client/cli/test_kill.py
@@ -50,7 +50,7 @@ class TestInstancesParser(unittest.TestCase):
 
 class TestKillCommand(AuroraClientCommandTest):
 
-  def test_kill_lock_error_nobatch(self):
+  def test_kill_updating_error_nobatch(self):
     """Verify that the no batch code path correctly includes the lock error 
message."""
     command = KillCommand()
     mock_options = mock_verb_options(command)
@@ -62,7 +62,7 @@ class TestKillCommand(AuroraClientCommandTest):
 
     mock_api = fake_context.get_api('test')
     mock_api.kill_job.return_value = 
AuroraClientCommandTest.create_blank_response(
-      ResponseCode.LOCK_ERROR, "Error.")
+      ResponseCode.JOB_UPDATING_ERROR, "Error.")
 
     with pytest.raises(Context.CommandError):
       command.execute(fake_context)
@@ -75,7 +75,7 @@ class TestKillCommand(AuroraClientCommandTest):
 
     self.assert_lock_message(fake_context)
 
-  def test_kill_lock_error_batches(self):
+  def test_kill_updating_error_batches(self):
     """Verify that the batch kill path short circuits and includes the lock 
error message."""
     command = KillCommand()
     mock_options = mock_verb_options(command)
@@ -91,7 +91,7 @@ class TestKillCommand(AuroraClientCommandTest):
 
     mock_api = fake_context.get_api('test')
     mock_api.kill_job.return_value = 
AuroraClientCommandTest.create_blank_response(
-      ResponseCode.LOCK_ERROR, "Error.")
+      ResponseCode.JOB_UPDATING_ERROR, "Error.")
 
     with pytest.raises(Context.CommandError):
       command.execute(fake_context)

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/python/apache/aurora/client/cli/test_supdate.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_supdate.py 
b/src/test/python/apache/aurora/client/cli/test_supdate.py
index 8b90885..a3bb5b9 100644
--- a/src/test/python/apache/aurora/client/cli/test_supdate.py
+++ b/src/test/python/apache/aurora/client/cli/test_supdate.py
@@ -117,7 +117,7 @@ class TestStartUpdate(AuroraClientCommandTest):
     mock_config = self.create_mock_config()
     self._fake_context.get_job_config = Mock(return_value=mock_config)
     self._mock_api.start_job_update.return_value = 
AuroraClientCommandTest.create_blank_response(
-        ResponseCode.LOCK_ERROR,
+        ResponseCode.JOB_UPDATING_ERROR,
         "Error.")
 
     with patch('apache.aurora.client.cli.update.DiffFormatter'):

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/python/apache/aurora/client/cli/util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/util.py 
b/src/test/python/apache/aurora/client/cli/util.py
index 43db828..476b8c2 100644
--- a/src/test/python/apache/aurora/client/cli/util.py
+++ b/src/test/python/apache/aurora/client/cli/util.py
@@ -396,7 +396,8 @@ jobs = [HELLO_WORLD]
 
   @classmethod
   def assert_lock_message(cls, context):
-    assert [line for line in context.get_err() if line == "\t%s" % 
context.LOCK_ERROR_MSG]
+    assert [line for line in context.get_err() if line == "\t%s" %
+        context.JOB_UPDATING_ERROR_MSG]
 
   PREFERRED_TIER = TierConfig(
     name='preferred',

http://git-wip-us.apache.org/repos/asf/aurora/blob/73e02c06/src/test/python/apache/aurora/client/test_base.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/test_base.py 
b/src/test/python/apache/aurora/client/test_base.py
index 691ed67..93fad1e 100644
--- a/src/test/python/apache/aurora/client/test_base.py
+++ b/src/test/python/apache/aurora/client/test_base.py
@@ -124,11 +124,11 @@ class TestBase(unittest.TestCase):
   @mock.patch('apache.aurora.client.base.log.info')
   @mock.patch('apache.aurora.client.base.sys.exit')
   def test_check_and_log_response(self, mock_sys_exit, mock_log):
-    resp = Response(responseCode=ResponseCode.LOCK_ERROR)
+    resp = Response(responseCode=ResponseCode.JOB_UPDATING_ERROR)
     out = base.check_and_log_response(resp)
     self.assertIsNone(out)
     mock_sys_exit.assert_called_once_with(1)
-    mock_log.assert_any_call('Response from scheduler: LOCK_ERROR (message: )')
+    mock_log.assert_any_call('Response from scheduler: JOB_UPDATING_ERROR 
(message: )')
 
   @mock.patch('apache.aurora.client.base.sys.exit')
   @mock.patch('apache.aurora.client.base.log.fatal')

Reply via email to