Remove LockStore Reviewed at https://reviews.apache.org/r/63744/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4fecf1f5 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4fecf1f5 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4fecf1f5 Branch: refs/heads/master Commit: 4fecf1f594e09a5ed6909df49faeee51e5007f8e Parents: fb64df2 Author: Bill Farner <[email protected]> Authored: Mon Nov 13 16:35:36 2017 -0800 Committer: Bill Farner <[email protected]> Committed: Mon Nov 13 16:35:36 2017 -0800 ---------------------------------------------------------------------- .../thrift/org/apache/aurora/gen/api.thrift | 19 -- .../thrift/org/apache/aurora/gen/storage.thrift | 19 +- .../org/apache/aurora/benchmark/JobUpdates.java | 13 +- .../aurora/benchmark/ThriftApiBenchmarks.java | 2 - .../aurora/benchmark/UpdateStoreBenchmarks.java | 3 - .../ShiroAuthorizingParamInterceptor.java | 10 - .../aurora/scheduler/state/LockManager.java | 50 ----- .../aurora/scheduler/state/LockManagerImpl.java | 93 --------- .../aurora/scheduler/state/StateModule.java | 3 - .../scheduler/storage/JobUpdateStore.java | 9 +- .../aurora/scheduler/storage/LockStore.java | 61 ------ .../aurora/scheduler/storage/Storage.java | 2 - .../scheduler/storage/log/LogStorage.java | 20 +- .../storage/log/SnapshotStoreImpl.java | 37 +--- .../storage/log/WriteAheadStorage.java | 56 +----- .../storage/mem/MemJobUpdateStore.java | 81 ++------ .../scheduler/storage/mem/MemLockStore.java | 72 ------- .../scheduler/storage/mem/MemStorage.java | 7 - .../scheduler/storage/mem/MemStorageModule.java | 2 - .../thrift/SchedulerThriftInterface.java | 24 +-- .../updater/JobUpdateControllerImpl.java | 19 +- .../scheduler/http/AbstractJettyTest.java | 2 - .../scheduler/state/LockManagerImplTest.java | 99 --------- .../storage/AbstractJobUpdateStoreTest.java | 165 +++++---------- .../storage/AbstractLockStoreTest.java | 200 ------------------- .../scheduler/storage/backup/RecoveryTest.java | 1 - .../scheduler/storage/log/LogStorageTest.java | 80 +------- .../storage/log/SnapshotStoreImplIT.java | 11 +- .../storage/log/WriteAheadStorageTest.java | 8 - .../scheduler/storage/mem/MemLockStoreTest.java | 24 --- .../storage/testing/StorageTestUtil.java | 5 - .../aurora/scheduler/thrift/Fixtures.java | 3 - .../thrift/SchedulerThriftInterfaceTest.java | 77 +------ .../aurora/scheduler/updater/JobUpdaterIT.java | 183 ++--------------- .../aurora/client/api/test_scheduler_client.py | 6 +- 35 files changed, 126 insertions(+), 1340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index c869493..1d36926 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -115,25 +115,6 @@ struct JobKey { 3: string name } -/** A unique lock key. */ -union LockKey { - 1: JobKey job -} - -/** A generic lock struct to facilitate context specific resource/operation serialization. */ -struct Lock { - /** ID of the lock - unique per storage */ - 1: LockKey key - /** UUID - facilitating soft lock authorization */ - 2: string token - /** Lock creator */ - 3: string user - /** Lock creation timestamp in milliseconds */ - 4: i64 timestampMs - /** Optional message to record with the lock */ - 5: optional string message -} - /** A unique identifier for the active task within a job. */ struct InstanceKey { /** Key identifying the job. */ http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/api/src/main/thrift/org/apache/aurora/gen/storage.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift index 74983ba..c692a5f 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift @@ -28,14 +28,6 @@ struct SaveCronJob { 2: api.JobConfiguration jobConfig } -struct SaveLock { - 1: api.Lock lock -} - -struct RemoveLock { - 1: api.LockKey lockKey -} - struct RemoveJob { 2: api.JobKey jobKey } @@ -63,13 +55,12 @@ struct SaveHostAttributes { struct SaveJobUpdate { 1: api.JobUpdate jobUpdate - 2: string lockToken + // 2: deleted } struct StoredJobUpdateDetails { 1: api.JobUpdateDetails details - /** ID of the lock associated with this update. */ - 2: string lockToken + // 2: deleted } struct SaveJobUpdateEvent { @@ -97,8 +88,8 @@ union Op { 9: RemoveQuota removeQuota 10: SaveHostAttributes saveHostAttributes // 11: removed - 12: SaveLock saveLock - 13: RemoveLock removeLock + // 12: deleted + // 13: deleted 14: SaveJobUpdate saveJobUpdate 15: SaveJobUpdateEvent saveJobUpdateEvent 16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent @@ -142,7 +133,7 @@ struct Snapshot { 5: set<StoredCronJob> cronJobs 6: SchedulerMetadata schedulerMetadata 8: set<QuotaConfiguration> quotaConfigurations - 9: set<api.Lock> locks + // 9: deleted 10: set<StoredJobUpdateDetails> jobUpdateDetails //11: removed //12: removed http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java index cedddf4..a5d1894 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java +++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java @@ -17,7 +17,6 @@ import java.util.Arrays; import java.util.Set; import java.util.UUID; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -33,8 +32,6 @@ import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.JobUpdateSummary; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.Range; import org.apache.aurora.gen.TaskConfig; @@ -46,7 +43,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.ILock; /** * Job update factory. @@ -71,14 +67,7 @@ final class JobUpdates { for (IJobUpdateDetails details : updates) { IJobUpdateKey key = details.getUpdate().getSummary().getKey(); keyBuilder.add(key); - String lockToken = UUID.randomUUID().toString(); - store.getLockStore().saveLock(ILock.build(new Lock() - .setKey(LockKey.job(key.getJob().newBuilder())) - .setToken(lockToken) - .setUser(Builder.USER) - .setTimestampMs(0L))); - - updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken)); + updateStore.saveJobUpdate(details.getUpdate()); for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) { updateStore.saveJobUpdateEvent(key, updateEvent); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java index 7ccdb11..05071a5 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java @@ -35,7 +35,6 @@ import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.cron.CronPredictor; import org.apache.aurora.scheduler.quota.QuotaManager; -import org.apache.aurora.scheduler.state.LockManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; @@ -148,7 +147,6 @@ public class ThriftApiBenchmarks { bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK); bind(CronPredictor.class).toInstance(createThrowingFake(CronPredictor.class)); bind(QuotaManager.class).toInstance(createThrowingFake(QuotaManager.class)); - bind(LockManager.class).toInstance(createThrowingFake(LockManager.class)); bind(StatsProvider.class).toInstance(new FakeStatsProvider()); bind(ConfigurationManager.class).toInstance(TaskTestUtil.CONFIGURATION_MANAGER); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java index 992e950..c98c514 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java @@ -68,7 +68,6 @@ public class UpdateStoreBenchmarks { public void tearDownIteration() { storage.write((NoResult.Quiet) storeProvider -> { storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents(); - storeProvider.getLockStore().deleteLocks(); }); } @@ -108,7 +107,6 @@ public class UpdateStoreBenchmarks { public void tearDownIteration() { storage.write((NoResult.Quiet) storeProvider -> { storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents(); - storeProvider.getLockStore().deleteLocks(); }); } @@ -148,7 +146,6 @@ public class UpdateStoreBenchmarks { public void tearDownIteration() { storage.write((NoResult.Quiet) storeProvider -> { storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents(); - storeProvider.getLockStore().deleteLocks(); }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java index 474a403..203599e 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java @@ -43,8 +43,6 @@ import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateRequest; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; import org.apache.aurora.gen.Response; import org.apache.aurora.gen.ResponseCode; import org.apache.aurora.gen.TaskConfig; @@ -106,12 +104,6 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { private static final FieldGetter<JobConfiguration, JobKey> JOB_CONFIGURATION_GETTER = new ThriftFieldGetter<>(JobConfiguration.class, JobConfiguration._Fields.KEY, JobKey.class); - private static final FieldGetter<Lock, LockKey> LOCK_GETTER = - new ThriftFieldGetter<>(Lock.class, Lock._Fields.KEY, LockKey.class); - - private static final FieldGetter<LockKey, JobKey> LOCK_KEY_GETTER = - new ThriftFieldGetter<>(LockKey.class, LockKey._Fields.JOB, JobKey.class); - private static final FieldGetter<JobUpdateKey, JobKey> JOB_UPDATE_KEY_GETTER = new ThriftFieldGetter<>(JobUpdateKey.class, JobUpdateKey._Fields.JOB, JobKey.class); @@ -124,8 +116,6 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { FieldGetters.compose(UPDATE_REQUEST_GETTER, TASK_CONFIG_GETTER), TASK_CONFIG_GETTER, JOB_CONFIGURATION_GETTER, - FieldGetters.compose(LOCK_GETTER, LOCK_KEY_GETTER), - LOCK_KEY_GETTER, JOB_UPDATE_KEY_GETTER, INSTANCE_KEY_GETTER, new IdentityFieldGetter<>(JobKey.class)); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/state/LockManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java b/src/main/java/org/apache/aurora/scheduler/state/LockManager.java deleted file mode 100644 index 1a65b08..0000000 --- a/src/main/java/org/apache/aurora/scheduler/state/LockManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.ILock; - -/** - * Defines all {@link ILock} primitives like: acquire, release, validate. - */ -public interface LockManager { - /** - * Creates, saves and returns a new {@link ILock} with the specified {@link IJobKey}. - * This method is not re-entrant, i.e. attempting to acquire a lock with the - * same key would throw a {@link LockException}. - * - * @param job The job being locked. - * @param user Name of the user requesting a lock. - * @return A new ILock instance. - * @throws LockException In case the lock with specified key already exists. - */ - ILock acquireLock(IJobKey job, String user) throws LockException; - - /** - * Releases (removes) the lock associated with {@code job} from the system. - * - * @param job the job to unlock. - */ - void releaseLock(IJobKey job); - - /** - * Thrown when {@link ILock} related operation failed. - */ - class LockException extends Exception { - public LockException(String msg) { - super(msg); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java deleted file mode 100644 index ec05f50..0000000 --- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.Date; -import java.util.Optional; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.aurora.common.util.Clock; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; -import org.apache.aurora.gen.LockKey._Fields; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.storage.LockStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; - -import static java.util.Objects.requireNonNull; - -/** - * Implements lock-related primitives required to provide mutual exclusion guarantees - * to the critical Scheduler state-mutating operations. - */ -@VisibleForTesting -public class LockManagerImpl implements LockManager { - private final Storage storage; - private final Clock clock; - private final UUIDGenerator tokenGenerator; - - @Inject - LockManagerImpl(Storage storage, Clock clock, UUIDGenerator tokenGenerator) { - this.storage = requireNonNull(storage); - this.clock = requireNonNull(clock); - this.tokenGenerator = requireNonNull(tokenGenerator); - } - - @Override - public ILock acquireLock(IJobKey job, final String user) throws LockException { - return storage.write(storeProvider -> { - - LockStore.Mutable lockStore = storeProvider.getLockStore(); - ILockKey lockKey = ILockKey.build(LockKey.job(job.newBuilder())); - Optional<ILock> existingLock = lockStore.fetchLock(lockKey); - - if (existingLock.isPresent()) { - throw new LockException(String.format( - "Operation for: %s is already in progress. Started at: %s. Current owner: %s.", - formatLockKey(lockKey), - new Date(existingLock.get().getTimestampMs()).toString(), - existingLock.get().getUser())); - } - - ILock lock = ILock.build(new Lock() - .setKey(lockKey.newBuilder()) - .setToken(tokenGenerator.createNew().toString()) - .setTimestampMs(clock.nowMillis()) - .setUser(user)); - - lockStore.saveLock(lock); - return lock; - }); - } - - @Override - public void releaseLock(IJobKey job) { - storage.write((NoResult.Quiet) storeProvider -> { - storeProvider.getLockStore().removeLock(ILockKey.build(LockKey.job(job.newBuilder()))); - }); - } - - private static String formatLockKey(ILockKey lockKey) { - return lockKey.getSetField() == _Fields.JOB - ? JobKeys.canonicalString(lockKey.getJob()) - : "Unknown lock key type: " + lockKey.getSetField(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/state/StateModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java index d72f055..c03fff1 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java @@ -14,7 +14,6 @@ package org.apache.aurora.scheduler.state; import java.util.List; - import javax.inject.Singleton; import com.beust.jcommander.Parameter; @@ -64,8 +63,6 @@ public class StateModule extends AbstractModule { bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class); bind(UUIDGeneratorImpl.class).in(Singleton.class); - bind(LockManager.class).to(LockManagerImpl.class); - bind(LockManagerImpl.class).in(Singleton.class); bindMaintenanceController(binder()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java index 5b57399..b3d906b 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java @@ -20,7 +20,6 @@ import java.util.Set; import com.google.common.base.Optional; import org.apache.aurora.gen.JobUpdateStatus; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdate; import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; @@ -96,8 +95,7 @@ public interface JobUpdateStore { * * @return A read-only view of all job update details. */ - Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails(); - + Set<IJobUpdateDetails> fetchAllJobUpdateDetails(); /** * Fetches the events that have affected an instance within a job update. * @@ -125,11 +123,8 @@ public interface JobUpdateStore { * without having at least one {@link IJobUpdateEvent} present in the store will return empty. * * @param update Update to save. - * @param lockToken Optional UUID identifying the lock associated with this update. - * The {@code lockToken} can be absent when terminal updates are re-inserted - * during snapshot restore. */ - void saveJobUpdate(IJobUpdate update, Optional<String> lockToken); + void saveJobUpdate(IJobUpdate update); /** * Saves a new job update event. http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java deleted file mode 100644 index 9764a01..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage; - -import java.util.Optional; -import java.util.Set; - -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; - -/** - * Stores all lock-related data and defines methods for saving, deleting and fetching locks. - */ -public interface LockStore { - /** - * Fetches all locks available in the store. - * - * @return All locks in the store. - */ - Set<ILock> fetchLocks(); - - /** - * Fetches a lock by its key. - * - * @param lockKey Key of the lock to fetch. - * @return Optional lock. - */ - Optional<ILock> fetchLock(ILockKey lockKey); - - interface Mutable extends LockStore { - /** - * Saves a new lock or overwrites the existing one with same LockKey. - * - * @param lock ILock to save. - */ - void saveLock(ILock lock); - - /** - * Removes the lock from the store. - * - * @param lockKey Key of the lock to remove. - */ - void removeLock(ILockKey lockKey); - - /** - * Deletes all locks from the store. - */ - void deleteLocks(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 7e810ab..7d325b6 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -40,7 +40,6 @@ public interface Storage { SchedulerStore getSchedulerStore(); CronJobStore getCronJobStore(); TaskStore getTaskStore(); - LockStore getLockStore(); QuotaStore getQuotaStore(); AttributeStore getAttributeStore(); JobUpdateStore getJobUpdateStore(); @@ -69,7 +68,6 @@ public interface Storage { */ TaskStore.Mutable getUnsafeTaskStore(); - LockStore.Mutable getLockStore(); QuotaStore.Mutable getQuotaStore(); AttributeStore.Mutable getAttributeStore(); JobUpdateStore.Mutable getJobUpdateStore(); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java index 3c9bae4..3ce2c7f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java @@ -24,7 +24,6 @@ import java.util.function.Consumer; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; @@ -51,7 +50,6 @@ import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; import org.apache.aurora.scheduler.storage.SnapshotStore; @@ -64,8 +62,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,7 +174,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore private final SchedulerStore.Mutable writeBehindSchedulerStore; private final CronJobStore.Mutable writeBehindJobStore; private final TaskStore.Mutable writeBehindTaskStore; - private final LockStore.Mutable writeBehindLockStore; private final QuotaStore.Mutable writeBehindQuotaStore; private final AttributeStore.Mutable writeBehindAttributeStore; private final JobUpdateStore.Mutable writeBehindJobUpdateStore; @@ -211,7 +206,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @Volatile SchedulerStore.Mutable schedulerStore, @Volatile CronJobStore.Mutable jobStore, @Volatile TaskStore.Mutable taskStore, - @Volatile LockStore.Mutable lockStore, @Volatile QuotaStore.Mutable quotaStore, @Volatile AttributeStore.Mutable attributeStore, @Volatile JobUpdateStore.Mutable jobUpdateStore, @@ -227,7 +221,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore schedulerStore, jobStore, taskStore, - lockStore, quotaStore, attributeStore, jobUpdateStore, @@ -246,7 +239,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore SchedulerStore.Mutable schedulerStore, CronJobStore.Mutable jobStore, TaskStore.Mutable taskStore, - LockStore.Mutable lockStore, QuotaStore.Mutable quotaStore, AttributeStore.Mutable attributeStore, JobUpdateStore.Mutable jobUpdateStore, @@ -267,7 +259,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore this.writeBehindSchedulerStore = requireNonNull(schedulerStore); this.writeBehindJobStore = requireNonNull(jobStore); this.writeBehindTaskStore = requireNonNull(taskStore); - this.writeBehindLockStore = requireNonNull(lockStore); this.writeBehindQuotaStore = requireNonNull(quotaStore); this.writeBehindAttributeStore = requireNonNull(attributeStore); this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore); @@ -289,7 +280,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore schedulerStore, jobStore, taskStore, - lockStore, quotaStore, attributeStore, jobUpdateStore, @@ -360,16 +350,9 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore LOG.info("Dropping host attributes with no agent ID: " + attributes); } }) - .put( - Op._Fields.SAVE_LOCK, - op -> writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()))) - .put( - Op._Fields.REMOVE_LOCK, - op -> writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()))) .put(Op._Fields.SAVE_JOB_UPDATE, op -> writeBehindJobUpdateStore.saveJobUpdate( - thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()), - Optional.fromNullable(op.getSaveJobUpdate().getLockToken()))) + thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()))) .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> { SaveJobUpdateEvent event = op.getSaveJobUpdateEvent(); writeBehindJobUpdateStore.saveJobUpdateEvent( @@ -487,7 +470,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore LOG.info("Snapshot complete." + " host attrs: " + snapshot.getHostAttributesSize() + ", cron jobs: " + snapshot.getCronJobsSize() - + ", locks: " + snapshot.getLocksSize() + ", quota confs: " + snapshot.getQuotaConfigurationsSize() + ", tasks: " + snapshot.getTasksSize() + ", updates: " + snapshot.getJobUpdateDetailsSize()); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java index 6462b80..57c483b 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java @@ -16,11 +16,11 @@ package org.apache.aurora.scheduler.storage.log; import java.util.Arrays; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -36,7 +36,6 @@ import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.JobInstanceUpdateEvent; import org.apache.aurora.gen.JobUpdateDetails; import org.apache.aurora.gen.JobUpdateEvent; -import org.apache.aurora.gen.Lock; import org.apache.aurora.gen.storage.QuotaConfiguration; import org.apache.aurora.gen.storage.SchedulerMetadata; import org.apache.aurora.gen.storage.Snapshot; @@ -54,7 +53,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.ILock; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.slf4j.Logger; @@ -75,7 +73,6 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class); - private static final String LOCK_FIELD = "locks"; private static final String HOST_ATTRIBUTES_FIELD = "hosts"; private static final String QUOTA_FIELD = "quota"; private static final String TASK_FIELD = "tasks"; @@ -94,29 +91,6 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { new SnapshotField() { @Override public String getName() { - return LOCK_FIELD; - } - - // It's important for locks to be replayed first, since there are relations that expect - // references to be valid on insertion. - @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { - snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks())); - } - - @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { - if (snapshot.getLocksSize() > 0) { - store.getLockStore().deleteLocks(); - for (Lock lock : snapshot.getLocks()) { - store.getLockStore().saveLock(ILock.build(lock)); - } - } - } - }, - new SnapshotField() { - @Override - public String getName() { return HOST_ATTRIBUTES_FIELD; } @@ -243,7 +217,10 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { @Override public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { - snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails()); + snapshot.setJobUpdateDetails( + store.getJobUpdateStore().fetchAllJobUpdateDetails().stream() + .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder())) + .collect(Collectors.toSet())); } @Override @@ -253,9 +230,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { updateStore.deleteAllUpdatesAndEvents(); for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) { JobUpdateDetails details = storedDetails.getDetails(); - updateStore.saveJobUpdate( - thriftBackfill.backFillJobUpdate(details.getUpdate()), - Optional.fromNullable(storedDetails.getLockToken())); + updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate())); if (details.getUpdateEventsSize() > 0) { for (JobUpdateEvent updateEvent : details.getUpdateEvents()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java index a5b58e8..4d051fc 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.aurora.gen.storage.Op; import org.apache.aurora.gen.storage.PruneJobUpdateHistory; import org.apache.aurora.gen.storage.RemoveJob; -import org.apache.aurora.gen.storage.RemoveLock; import org.apache.aurora.gen.storage.RemoveQuota; import org.apache.aurora.gen.storage.RemoveTasks; import org.apache.aurora.gen.storage.SaveCronJob; @@ -34,17 +33,14 @@ import org.apache.aurora.gen.storage.SaveHostAttributes; import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; import org.apache.aurora.gen.storage.SaveJobUpdate; import org.apache.aurora.gen.storage.SaveJobUpdateEvent; -import org.apache.aurora.gen.storage.SaveLock; import org.apache.aurora.gen.storage.SaveQuota; import org.apache.aurora.gen.storage.SaveTasks; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; @@ -60,8 +56,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.slf4j.Logger; @@ -80,7 +74,6 @@ class WriteAheadStorage implements SchedulerStore.Mutable, CronJobStore.Mutable, TaskStore.Mutable, - LockStore.Mutable, QuotaStore.Mutable, AttributeStore.Mutable, JobUpdateStore.Mutable { @@ -89,7 +82,6 @@ class WriteAheadStorage implements private final SchedulerStore.Mutable schedulerStore; private final CronJobStore.Mutable jobStore; private final TaskStore.Mutable taskStore; - private final LockStore.Mutable lockStore; private final QuotaStore.Mutable quotaStore; private final AttributeStore.Mutable attributeStore; private final JobUpdateStore.Mutable jobUpdateStore; @@ -103,7 +95,6 @@ class WriteAheadStorage implements * @param schedulerStore Delegate. * @param jobStore Delegate. * @param taskStore Delegate. - * @param lockStore Delegate. * @param quotaStore Delegate. * @param attributeStore Delegate. * @param jobUpdateStore Delegate. @@ -113,7 +104,6 @@ class WriteAheadStorage implements SchedulerStore.Mutable schedulerStore, CronJobStore.Mutable jobStore, TaskStore.Mutable taskStore, - LockStore.Mutable lockStore, QuotaStore.Mutable quotaStore, AttributeStore.Mutable attributeStore, JobUpdateStore.Mutable jobUpdateStore, @@ -124,7 +114,6 @@ class WriteAheadStorage implements this.schedulerStore = requireNonNull(schedulerStore); this.jobStore = requireNonNull(jobStore); this.taskStore = requireNonNull(taskStore); - this.lockStore = requireNonNull(lockStore); this.quotaStore = requireNonNull(quotaStore); this.attributeStore = requireNonNull(attributeStore); this.jobUpdateStore = requireNonNull(jobUpdateStore); @@ -221,27 +210,11 @@ class WriteAheadStorage implements } @Override - public void saveLock(final ILock lock) { - requireNonNull(lock); - - write(Op.saveLock(new SaveLock(lock.newBuilder()))); - lockStore.saveLock(lock); - } - - @Override - public void removeLock(final ILockKey lockKey) { - requireNonNull(lockKey); - - write(Op.removeLock(new RemoveLock(lockKey.newBuilder()))); - lockStore.removeLock(lockKey); - } - - @Override - public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) { + public void saveJobUpdate(IJobUpdate update) { requireNonNull(update); - write(Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken.orNull()))); - jobUpdateStore.saveJobUpdate(update, lockToken); + write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); + jobUpdateStore.saveJobUpdate(update); } @Override @@ -306,12 +279,6 @@ class WriteAheadStorage implements } @Override - public void deleteLocks() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override public void deleteAllUpdatesAndEvents() { throw new UnsupportedOperationException( "Unsupported since casual storage users should never be doing this."); @@ -333,11 +300,6 @@ class WriteAheadStorage implements } @Override - public LockStore.Mutable getLockStore() { - return this; - } - - @Override public QuotaStore.Mutable getQuotaStore() { return this; } @@ -398,16 +360,6 @@ class WriteAheadStorage implements } @Override - public Set<ILock> fetchLocks() { - return this.lockStore.fetchLocks(); - } - - @Override - public java.util.Optional<ILock> fetchLock(ILockKey lockKey) { - return this.lockStore.fetchLock(lockKey); - } - - @Override public Optional<IHostAttributes> getHostAttributes(String host) { return this.attributeStore.getHostAttributes(host); } @@ -443,7 +395,7 @@ class WriteAheadStorage implements } @Override - public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() { + public Set<IJobUpdateDetails> fetchAllJobUpdateDetails() { return this.jobUpdateStore.fetchAllJobUpdateDetails(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java index d190add..826cee9 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java @@ -32,7 +32,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -50,10 +49,7 @@ import org.apache.aurora.gen.JobUpdateDetails; import org.apache.aurora.gen.JobUpdateEvent; import org.apache.aurora.gen.JobUpdateState; import org.apache.aurora.gen.JobUpdateStatus; -import org.apache.aurora.gen.LockKey; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -64,8 +60,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; import static java.util.Objects.requireNonNull; @@ -78,14 +72,12 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { .reverse() .onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs()); - private final Map<IJobUpdateKey, UpdateAndLock> updates = Maps.newConcurrentMap(); - private final LockStore lockStore; + private final Map<IJobUpdateKey, IJobUpdateDetails> updates = Maps.newConcurrentMap(); private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats; private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats; @Inject - public MemJobUpdateStore(LockStore.Mutable lockStore, StatsProvider statsProvider) { - this.lockStore = lockStore; + public MemJobUpdateStore(StatsProvider statsProvider) { this.jobUpdateEventStats = CacheBuilder.newBuilder() .build(new CacheLoader<JobUpdateStatus, AtomicLong>() { @Override @@ -125,13 +117,13 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { @Timed("job_update_store_fetch_details") @Override public synchronized Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) { - return Optional.fromNullable(updates.get(key)).transform(u -> u.details); + return Optional.fromNullable(updates.get(key)); } @Timed("job_update_store_fetch_update") @Override public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) { - return Optional.fromNullable(updates.get(key)).transform(u -> u.details.getUpdate()); + return Optional.fromNullable(updates.get(key)).transform(IJobUpdateDetails::getUpdate); } @Timed("job_update_store_fetch_instructions") @@ -140,37 +132,13 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { IJobUpdateKey key) { return Optional.fromNullable(updates.get(key)) - .transform(u -> u.details.getUpdate().getInstructions()); - } - - private void refreshLocks() { - // Simulate database behavior of join performed against locks, used to populate lockToken field. - - ImmutableMap.Builder<IJobUpdateKey, UpdateAndLock> refreshed = ImmutableMap.builder(); - for (Map.Entry<IJobUpdateKey, UpdateAndLock> entry : updates.entrySet()) { - IJobUpdateDetails update = entry.getValue().details; - Optional<String> updateLock = entry.getValue().lockToken; - if (updateLock.isPresent()) { - // Determine if token needs to be cleared to reflect lock store state. The token may only - // remain if the lock store token exists and matches. - Optional<String> storedLock = Optional.fromNullable(lockStore.fetchLock(ILockKey.build( - LockKey.job(entry.getKey().getJob().newBuilder()))).map(ILock::getToken).orElse(null)); - if (!storedLock.isPresent() || !updateLock.equals(storedLock)) { - refreshed.put(entry.getKey(), new UpdateAndLock(update, Optional.absent())); - } - } - } - - updates.putAll(refreshed.build()); + .transform(u -> u.getUpdate().getInstructions()); } @Timed("job_update_store_fetch_all_details") @Override - public synchronized Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() { - refreshLocks(); - return updates.values().stream() - .map(u -> new StoredJobUpdateDetails(u.details.newBuilder(), u.lockToken.orNull())) - .collect(Collectors.toSet()); + public synchronized Set<IJobUpdateDetails> fetchAllJobUpdateDetails() { + return ImmutableSet.copyOf(updates.values()); } @Timed("job_update_store_fetch_instance_events") @@ -180,7 +148,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { int instanceId) { return java.util.Optional.ofNullable(updates.get(key)) - .map(u -> u.details.getInstanceEvents()) + .map(IJobUpdateDetails::getInstanceEvents) .orElse(ImmutableList.of()) .stream() .filter(e -> e.getInstanceId() == instanceId) @@ -210,7 +178,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { @Timed("job_update_store_save_update") @Override - public synchronized void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) { + public synchronized void saveJobUpdate(IJobUpdate update) { requireNonNull(update); validateInstructions(update.getInstructions()); @@ -224,9 +192,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { .setInstanceEvents(ImmutableList.of()); mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable)); - updates.put( - update.getSummary().getKey(), - new UpdateAndLock(IJobUpdateDetails.build(mutable), lockToken)); + updates.put(update.getSummary().getKey(), IJobUpdateDetails.build(mutable)); } private static final Ordering<JobUpdateEvent> EVENT_ORDERING = Ordering.natural() @@ -235,16 +201,16 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { @Timed("job_update_store_save_event") @Override public synchronized void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { - UpdateAndLock update = updates.get(key); + IJobUpdateDetails update = updates.get(key); if (update == null) { throw new StorageException("Update not found: " + key); } - JobUpdateDetails mutable = update.details.newBuilder(); + JobUpdateDetails mutable = update.newBuilder(); mutable.addToUpdateEvents(event.newBuilder()); mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents())); mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable)); - updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken)); + updates.put(key, IJobUpdateDetails.build(mutable)); jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet(); } @@ -257,16 +223,16 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { IJobUpdateKey key, IJobInstanceUpdateEvent event) { - UpdateAndLock update = updates.get(key); + IJobUpdateDetails update = updates.get(key); if (update == null) { throw new StorageException("Update not found: " + key); } - JobUpdateDetails mutable = update.details.newBuilder(); + JobUpdateDetails mutable = update.newBuilder(); mutable.addToInstanceEvents(event.newBuilder()); mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents())); mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable)); - updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken)); + updates.put(key, IJobUpdateDetails.build(mutable)); jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet(); } @@ -283,7 +249,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { long historyPruneThresholdMs) { Supplier<Stream<IJobUpdateSummary>> completedUpdates = () -> updates.values().stream() - .map(u -> u.details.getUpdate().getSummary()) + .map(u -> u.getUpdate().getSummary()) .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus())); Predicate<IJobUpdateSummary> expiredFilter = @@ -311,7 +277,7 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { pruneBuilder.addAll(creationOrder .leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount) .stream() - .map(s -> s.getKey()) + .map(IJobUpdateSummary::getKey) .iterator()); } } @@ -372,7 +338,6 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { // TODO(wfarner): Modification time is not a stable ordering for pagination, but we use it as // such here. The behavior is carried over from DbJobupdateStore; determine if it is desired. Stream<IJobUpdateDetails> matches = updates.values().stream() - .map(u -> u.details) .filter(filter) .sorted(REVERSE_LAST_MODIFIED_ORDER) .skip(query.getOffset()); @@ -383,14 +348,4 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable { return matches; } - - private static final class UpdateAndLock { - private final IJobUpdateDetails details; - private final Optional<String> lockToken; - - UpdateAndLock(IJobUpdateDetails details, Optional<String> lockToken) { - this.details = details; - this.lockToken = lockToken; - } - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java deleted file mode 100644 index 4c7bda8..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.mem; - -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; - -import org.apache.aurora.scheduler.storage.LockStore; -import org.apache.aurora.scheduler.storage.Storage.StorageException; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; - -/** - * An in-memory lock store. - */ -class MemLockStore implements LockStore.Mutable { - - private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap(); - - @Override - public void saveLock(ILock lock) { - // TODO(wfarner): Re-evaluate, this is not idempotent. - if (locks.containsKey(lock.getKey())) { - throw new StorageException("Duplicate lock key"); - } - if (FluentIterable.from(locks.values()) - .transform(ILock::getToken) - .anyMatch(Predicates.equalTo(lock.getToken()))) { - - throw new StorageException("Duplicate token"); - } - - locks.put(lock.getKey(), lock); - } - - @Override - public void removeLock(ILockKey lockKey) { - locks.remove(lockKey); - } - - @Override - public void deleteLocks() { - locks.clear(); - } - - @Override - public Set<ILock> fetchLocks() { - return ImmutableSet.copyOf(locks.values()); - } - - @Override - public Optional<ILock> fetchLock(ILockKey lockKey) { - return Optional.ofNullable(locks.get(lockKey)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java index 7ace104..9f324b0 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java @@ -19,7 +19,6 @@ import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; import org.apache.aurora.scheduler.storage.Storage; @@ -36,7 +35,6 @@ public class MemStorage implements Storage { @Volatile final SchedulerStore.Mutable schedulerStore, @Volatile final CronJobStore.Mutable jobStore, @Volatile final TaskStore.Mutable taskStore, - @Volatile final LockStore.Mutable lockStore, @Volatile final QuotaStore.Mutable quotaStore, @Volatile final AttributeStore.Mutable attributeStore, @Volatile final JobUpdateStore.Mutable updateStore) { @@ -63,11 +61,6 @@ public class MemStorage implements Storage { } @Override - public LockStore.Mutable getLockStore() { - return lockStore; - } - - @Override public QuotaStore.Mutable getQuotaStore() { return quotaStore; } http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java index 2ad84eb..edcea09 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java @@ -30,7 +30,6 @@ import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; import org.apache.aurora.scheduler.storage.Storage; @@ -74,7 +73,6 @@ public final class MemStorageModule extends PrivateModule { bindStore(TaskStore.Mutable.class, MemTaskStore.class); bindStore(CronJobStore.Mutable.class, MemCronJobStore.class); bindStore(AttributeStore.Mutable.class, MemAttributeStore.class); - bindStore(LockStore.Mutable.class, MemLockStore.class); bindStore(QuotaStore.Mutable.class, MemQuotaStore.class); bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class); bindStore(JobUpdateStore.Mutable.class, MemJobUpdateStore.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 534ae59..2cc567d 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -438,17 +438,6 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { return readOnlyScheduler.getTierConfigs(); } - private void validateLockForTasks(Iterable<IScheduledTask> tasks) throws JobUpdatingException { - ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks) - .transform(Tasks::getJob) - .toSet(); - - // Validate lock against every unique job key derived from the tasks. - for (IJobKey key : uniqueKeys) { - jobUpdateController.assertNotUpdating(key); - } - } - private static Query.Builder implicitKillQuery(Query.Builder query) { // Unless statuses were specifically supplied, only attempt to kill active tasks. return query.get().getStatuses().isEmpty() ? query.byStatus(ACTIVE_STATES) : query; @@ -470,13 +459,14 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } return storage.write(storeProvider -> { - Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query); try { - validateLockForTasks(tasks); + jobUpdateController.assertNotUpdating(jobKey); } catch (JobUpdatingException e) { return error(JOB_UPDATING_ERROR, e); } + Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query); + LOG.info("Killing tasks matching " + query); int tasksKilled = 0; @@ -678,6 +668,10 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { public Response addInstances(InstanceKey key, int count) { IJobKey jobKey = JobKeys.assertValid(IJobKey.build(key.getJobKey())); + if (count <= 0) { + return invalidRequest(INVALID_INSTANCE_COUNT); + } + Response response = empty(); return storage.write(storeProvider -> { try { @@ -690,10 +684,6 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { FluentIterable<IScheduledTask> currentTasks = FluentIterable.from( storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active())); - if (count <= 0) { - return invalidRequest(INVALID_INSTANCE_COUNT); - } - Optional<IScheduledTask> templateTask = Iterables.tryFind( currentTasks, e -> e.getAssignedTask().getInstanceId() == key.getInstanceId()); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 27c0b43..dc8d11c 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -48,8 +48,6 @@ import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.base.InstanceKeys; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.state.LockManager; -import org.apache.aurora.scheduler.state.LockManager.LockException; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.Storage; @@ -65,7 +63,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; -import org.apache.aurora.scheduler.storage.entities.ILock; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.updater.StateEvaluator.Failure; import org.slf4j.Logger; @@ -112,7 +109,6 @@ class JobUpdateControllerImpl implements JobUpdateController { "Unexpected problem running asynchronous updater for: %s. Triggering shutdown"; private final UpdateFactory updateFactory; - private final LockManager lockManager; private final Storage storage; private final ScheduledExecutorService executor; private final StateManager stateManager; @@ -130,7 +126,6 @@ class JobUpdateControllerImpl implements JobUpdateController { @Inject JobUpdateControllerImpl( UpdateFactory updateFactory, - LockManager lockManager, Storage storage, ScheduledExecutorService executor, StateManager stateManager, @@ -140,7 +135,6 @@ class JobUpdateControllerImpl implements JobUpdateController { TaskEventBatchWorker batchWorker) { this.updateFactory = requireNonNull(updateFactory); - this.lockManager = requireNonNull(lockManager); this.storage = requireNonNull(storage); this.executor = requireNonNull(executor); this.stateManager = requireNonNull(stateManager); @@ -187,16 +181,8 @@ class JobUpdateControllerImpl implements JobUpdateController { } LOG.info("Starting update for job " + job); - ILock lock; - try { - lock = lockManager.acquireLock(job, auditData.getUser()); - } catch (LockException e) { - throw new UpdateStateException(e.getMessage(), e); - } - storeProvider.getJobUpdateStore().saveJobUpdate( - update, - Optional.of(requireNonNull(lock.getToken()))); + storeProvider.getJobUpdateStore().saveJobUpdate(update); JobUpdateStatus status = ROLLING_FORWARD; if (isCoordinatedUpdate(instructions)) { @@ -474,8 +460,8 @@ class JobUpdateControllerImpl implements JobUpdateController { JobUpdateEvent proposedEvent, boolean record) throws UpdateStateException { - JobUpdateStatus status = proposedEvent.getStatus(); JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore(); + JobUpdateStatus status = proposedEvent.getStatus(); LOG.info("Update {} is now in state {}", key, status); if (record) { @@ -485,7 +471,6 @@ class JobUpdateControllerImpl implements JobUpdateController { } if (JobUpdateStore.TERMINAL_STATES.contains(status)) { - lockManager.releaseLock(key.getJob()); pulseHandler.remove(key); } else { pulseHandler.updatePulseStatus(key, status); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java index 8301b19..a3f6941 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java @@ -59,7 +59,6 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.scheduling.TaskGroups; import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings; import org.apache.aurora.scheduler.scheduling.TaskScheduler; -import org.apache.aurora.scheduler.state.LockManager; import org.apache.aurora.scheduler.stats.StatsModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IServerInfo; @@ -128,7 +127,6 @@ public abstract class AbstractJettyTest extends EasyMockTest { 5)); bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor); bindMock(CronJobManager.class); - bindMock(LockManager.class); bindMock(OfferManager.class); bindMock(RescheduleCalculator.class); bindMock(TaskScheduler.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/4fecf1f5/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java deleted file mode 100644 index 8e19794..0000000 --- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.UUID; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.util.testing.FakeClock; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.state.LockManager.LockException; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.mem.MemStorageModule; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class LockManagerImplTest extends EasyMockTest { - private static final String USER = "jim-user"; - private static final String MY_JOB = "myJob"; - private static final IJobKey JOB_KEY = JobKeys.from("jim", "devel", MY_JOB); - private static final UUID TOKEN = UUID.fromString("79d6d790-3212-11e3-aa6e-0800200c9a66"); - - private LockManager lockManager; - private long timestampMs; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setUp() throws Exception { - FakeClock clock = new FakeClock(); - clock.advance(Amount.of(12345L, Time.SECONDS)); - timestampMs = clock.nowMillis(); - - UUIDGenerator tokenGenerator = createMock(UUIDGenerator.class); - expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes(); - - lockManager = new LockManagerImpl(MemStorageModule.newEmptyStorage(), clock, tokenGenerator); - } - - @Test - public void testAcquireLock() throws Exception { - control.replay(); - - ILock expected = ILock.build(new Lock() - .setKey(LockKey.job(JOB_KEY.newBuilder())) - .setToken(TOKEN.toString()) - .setTimestampMs(timestampMs) - .setUser(USER)); - - ILock actual = lockManager.acquireLock(JOB_KEY, USER); - assertEquals(expected, actual); - } - - @Test - public void testAcquireLockInProgress() throws Exception { - control.replay(); - - expectLockException(JOB_KEY); - lockManager.acquireLock(JOB_KEY, USER); - lockManager.acquireLock(JOB_KEY, USER); - } - - @Test - public void testReleaseLock() throws Exception { - control.replay(); - - lockManager.acquireLock(JOB_KEY, USER); - lockManager.releaseLock(JOB_KEY); - - // Should be able to lock again after releasing. - lockManager.acquireLock(JOB_KEY, USER); - } - - private void expectLockException(IJobKey key) { - expectedException.expect(LockException.class); - expectedException.expectMessage(JobKeys.canonicalString(key)); - } -}
