Repository: aurora Updated Branches: refs/heads/master 11577c703 -> 0e62780c4
Revert "Replace manual Forwarding* with `@Forward`." This reverts commit 0c98e8a82177f39534db3a49162582aeb1739728. Bugs closed: AURORA-1146 Reviewed at https://reviews.apache.org/r/41140/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0e62780c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0e62780c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0e62780c Branch: refs/heads/master Commit: 0e62780c48ecc89571482f5b6663914147f96234 Parents: 11577c7 Author: John Sirois <[email protected]> Authored: Wed Dec 9 08:58:20 2015 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Wed Dec 9 08:58:20 2015 -0800 ---------------------------------------------------------------------- build.gradle | 3 +- .../scheduler/storage/ForwardingStore.java | 185 +++++++++++++ .../storage/log/WriteAheadStorage.java | 13 +- .../scheduler/thrift/aop/ForwardingThrift.java | 266 +++++++++++++++++++ .../thrift/aop/MockDecoratedThrift.java | 5 +- 5 files changed, 455 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/0e62780c/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index fcf8039..d4efe3a 100644 --- a/build.gradle +++ b/build.gradle @@ -354,7 +354,7 @@ dependencies { compile "log4j:log4j:${log4jRev}" compile "org.antlr:stringtemplate:${stringTemplateRev}" compile 'org.apache.mesos:mesos:0.23.0' - compile "org.apache.shiro:shiro-guice:${shiroRev}" + compile("org.apache.shiro:shiro-guice:${shiroRev}") compile "org.apache.shiro:shiro-web:${shiroRev}" compile "org.apache.zookeeper:zookeeper:${zookeeperRev}" compile "org.eclipse.jetty:jetty-rewrite:${jettyDep}" @@ -365,7 +365,6 @@ dependencies { compile 'org.mybatis:mybatis-guice:3.6' compile 'org.quartz-scheduler:quartz:2.2.1' compile "org.slf4j:slf4j-jdk14:${slf4jRev}" - compile "uno.perk:forward:1.0.0" testCompile "com.sun.jersey:jersey-client:${jerseyRev}" testCompile "junit:junit:${junitRev}" http://git-wip-us.apache.org/repos/asf/aurora/blob/0e62780c/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java new file mode 100644 index 0000000..b8bd918 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java @@ -0,0 +1,185 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Optional; + +import org.apache.aurora.gen.storage.StoredJobUpdateDetails; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; +import org.apache.aurora.scheduler.storage.entities.ILock; +import org.apache.aurora.scheduler.storage.entities.ILockKey; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; + +import static java.util.Objects.requireNonNull; + +/** + * A store that forwards all its operations to underlying storage systems. Useful for decorating + * an existing storage system. + */ +public class ForwardingStore implements + SchedulerStore, + CronJobStore, + TaskStore, + LockStore, + QuotaStore, + AttributeStore, + JobUpdateStore { + + private final SchedulerStore schedulerStore; + private final CronJobStore cronJobStore; + private final TaskStore taskStore; + private final LockStore lockStore; + private final QuotaStore quotaStore; + private final AttributeStore attributeStore; + private final JobUpdateStore jobUpdateStore; + + /** + * Creates a new forwarding store that delegates to the providing default stores. + * + * @param schedulerStore Delegate. + * @param cronJobStore Delegate. + * @param taskStore Delegate. + * @param lockStore Delegate. + * @param quotaStore Delegate. + * @param attributeStore Delegate. + * @param jobUpdateStore Delegate. + */ + public ForwardingStore( + SchedulerStore schedulerStore, + CronJobStore cronJobStore, + TaskStore taskStore, + LockStore lockStore, + QuotaStore quotaStore, + AttributeStore attributeStore, + JobUpdateStore jobUpdateStore) { + + this.schedulerStore = requireNonNull(schedulerStore); + this.cronJobStore = requireNonNull(cronJobStore); + this.taskStore = requireNonNull(taskStore); + this.lockStore = requireNonNull(lockStore); + this.quotaStore = requireNonNull(quotaStore); + this.attributeStore = requireNonNull(attributeStore); + this.jobUpdateStore = requireNonNull(jobUpdateStore); + } + + @Override + public Optional<String> fetchFrameworkId() { + return schedulerStore.fetchFrameworkId(); + } + + @Override + public Iterable<IJobConfiguration> fetchJobs() { + return cronJobStore.fetchJobs(); + } + + @Override + public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { + return cronJobStore.fetchJob(jobKey); + } + + @Override + public Iterable<IScheduledTask> fetchTasks(Query.Builder querySupplier) { + return taskStore.fetchTasks(querySupplier); + } + + @Override + public Set<IJobKey> getJobKeys() { + return taskStore.getJobKeys(); + } + + @Override + public Set<ILock> fetchLocks() { + return lockStore.fetchLocks(); + } + + @Override + public java.util.Optional<ILock> fetchLock(ILockKey lockKey) { + return lockStore.fetchLock(lockKey); + } + + @Override + public Map<String, IResourceAggregate> fetchQuotas() { + return quotaStore.fetchQuotas(); + } + + @Override + public Optional<IResourceAggregate> fetchQuota(String role) { + return quotaStore.fetchQuota(role); + } + + @Override + public Optional<IHostAttributes> getHostAttributes(String host) { + return attributeStore.getHostAttributes(host); + } + + @Override + public Set<IHostAttributes> getHostAttributes() { + return attributeStore.getHostAttributes(); + } + + @Override + public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) { + return jobUpdateStore.fetchJobUpdateSummaries(query); + } + + @Override + public Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) { + return jobUpdateStore.fetchJobUpdateDetails(key); + } + + @Override + public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) { + return jobUpdateStore.fetchJobUpdateDetails(query); + } + + @Override + public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) { + return jobUpdateStore.fetchJobUpdate(key); + } + + @Override + public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) { + return jobUpdateStore.fetchJobUpdateInstructions(key); + } + + @Override + public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() { + return jobUpdateStore.fetchAllJobUpdateDetails(); + } + + @Override + public Optional<String> getLockToken(IJobUpdateKey key) { + return jobUpdateStore.getLockToken(key); + } + + @Override + public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) { + return jobUpdateStore.fetchInstanceEvents(key, instanceId); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0e62780c/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java index 2d34f36..89dd8aa 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java @@ -46,6 +46,7 @@ import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.ForwardingStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; @@ -65,8 +66,6 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import uno.perk.forward.Forward; - import static java.util.Objects.requireNonNull; import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager; @@ -76,15 +75,7 @@ import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionMana * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable * stores. */ -@Forward({ - SchedulerStore.class, - CronJobStore.class, - TaskStore.class, - LockStore.class, - QuotaStore.class, - AttributeStore.class, - JobUpdateStore.class}) -class WriteAheadStorage extends WriteAheadStorageForwarder implements +class WriteAheadStorage extends ForwardingStore implements MutableStoreProvider, SchedulerStore.Mutable, CronJobStore.Mutable, http://git-wip-us.apache.org/repos/asf/aurora/blob/0e62780c/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java b/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java new file mode 100644 index 0000000..2de1783 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java @@ -0,0 +1,266 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.thrift.aop; + +import java.util.Set; + +import org.apache.aurora.gen.AddInstancesConfig; +import org.apache.aurora.gen.Hosts; +import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.JobUpdateQuery; +import org.apache.aurora.gen.JobUpdateRequest; +import org.apache.aurora.gen.Lock; +import org.apache.aurora.gen.LockKey; +import org.apache.aurora.gen.LockValidation; +import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.gen.Response; +import org.apache.aurora.gen.RewriteConfigsRequest; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.TaskQuery; +import org.apache.thrift.TException; + +import static java.util.Objects.requireNonNull; + +/** + * A forwarding scheduler controller to make it easy to override specific behavior in an + * implementation class. + */ +abstract class ForwardingThrift implements AnnotatedAuroraAdmin { + + private final AnnotatedAuroraAdmin delegate; + + ForwardingThrift(AnnotatedAuroraAdmin delegate) { + this.delegate = requireNonNull(delegate); + } + + @Override + public Response setQuota( + String ownerRole, + ResourceAggregate resourceAggregate) throws TException { + + return delegate.setQuota(ownerRole, resourceAggregate); + } + + @Override + public Response forceTaskState(String taskId, ScheduleStatus status) throws TException { + return delegate.forceTaskState(taskId, status); + } + + @Override + public Response performBackup() throws TException { + return delegate.performBackup(); + } + + @Override + public Response listBackups() throws TException { + return delegate.listBackups(); + } + + @Override + public Response stageRecovery(String backupId) throws TException { + return delegate.stageRecovery(backupId); + } + + @Override + public Response queryRecovery(TaskQuery query) throws TException { + return delegate.queryRecovery(query); + } + + @Override + public Response deleteRecoveryTasks(TaskQuery query) throws TException { + return delegate.deleteRecoveryTasks(query); + } + + @Override + public Response commitRecovery() throws TException { + return delegate.commitRecovery(); + } + + @Override + public Response unloadRecovery() throws TException { + return delegate.unloadRecovery(); + } + + @Override + public Response getRoleSummary() throws TException { + return delegate.getRoleSummary(); + } + + @Override + public Response getJobSummary(String role) throws TException { + return delegate.getJobSummary(role); + } + + @Override + public Response getConfigSummary(JobKey key) throws TException { + return delegate.getConfigSummary(key); + } + + @Override + public Response createJob(JobConfiguration description, Lock lock) throws TException { + return delegate.createJob(description, lock); + } + + @Override + public Response scheduleCronJob(JobConfiguration description, Lock lock) throws TException { + return delegate.scheduleCronJob(description, lock); + } + + @Override + public Response descheduleCronJob(JobKey job, Lock lock) throws TException { + return delegate.descheduleCronJob(job, lock); + } + + @Override + public Response replaceCronTemplate(JobConfiguration config, Lock lock) throws TException { + return delegate.replaceCronTemplate(config, lock); + } + + @Override + public Response populateJobConfig(JobConfiguration description) throws TException { + return delegate.populateJobConfig(description); + } + + @Override + public Response startCronJob(JobKey job) throws TException { + return delegate.startCronJob(job); + } + + @Override + public Response restartShards(JobKey job, Set<Integer> shardIds, Lock lock) throws TException { + return delegate.restartShards(job, shardIds, lock); + } + + @Override + public Response getTasksStatus(TaskQuery query) throws TException { + return delegate.getTasksStatus(query); + } + + @Override + public Response getTasksWithoutConfigs(TaskQuery query) throws TException { + return delegate.getTasksStatus(query); + } + + @Override + public Response getJobs(String ownerRole) throws TException { + return delegate.getJobs(ownerRole); + } + + @Override + public Response killTasks(TaskQuery query, Lock lock) throws TException { + return delegate.killTasks(query, lock); + } + + @Override + public Response getQuota(String ownerRole) throws TException { + return delegate.getQuota(ownerRole); + } + + @Override + public Response startMaintenance(Hosts hosts) throws TException { + return delegate.startMaintenance(hosts); + } + + @Override + public Response drainHosts(Hosts hosts) throws TException { + return delegate.drainHosts(hosts); + } + + @Override + public Response maintenanceStatus(Hosts hosts) throws TException { + return delegate.maintenanceStatus(hosts); + } + + @Override + public Response endMaintenance(Hosts hosts) throws TException { + return delegate.endMaintenance(hosts); + } + + @Override + public Response snapshot() throws TException { + return delegate.snapshot(); + } + + @Override + public Response rewriteConfigs(RewriteConfigsRequest request) throws TException { + return delegate.rewriteConfigs(request); + } + + @Override + public Response acquireLock(LockKey lockKey) throws TException { + return delegate.acquireLock(lockKey); + } + + @Override + public Response releaseLock(Lock lock, LockValidation validation) throws TException { + return delegate.releaseLock(lock, validation); + } + + @Override + public Response getLocks() throws TException { + return delegate.getLocks(); + } + + @Override + public Response addInstances(AddInstancesConfig config, Lock lock) throws TException { + return delegate.addInstances(config, lock); + } + + @Override + public Response getPendingReason(TaskQuery query) throws TException { + return delegate.getPendingReason(query); + } + + @Override + public Response startJobUpdate(JobUpdateRequest request, String message) throws TException { + return delegate.startJobUpdate(request, message); + } + + @Override + public Response pauseJobUpdate(JobUpdateKey key, String message) throws TException { + return delegate.pauseJobUpdate(key, message); + } + + @Override + public Response resumeJobUpdate(JobUpdateKey key, String message) throws TException { + return delegate.resumeJobUpdate(key, message); + } + + @Override + public Response abortJobUpdate(JobUpdateKey key, String message) throws TException { + return delegate.abortJobUpdate(key, message); + } + + @Override + public Response pulseJobUpdate(JobUpdateKey key) throws TException { + return delegate.pulseJobUpdate(key); + } + + @Override + public Response getJobUpdateSummaries(JobUpdateQuery updateQuery) throws TException { + return delegate.getJobUpdateSummaries(updateQuery); + } + + @Override + public Response getJobUpdateDetails(JobUpdateKey key) throws TException { + return delegate.getJobUpdateDetails(key); + } + + @Override + public Response getJobUpdateDiff(JobUpdateRequest request) throws TException { + return delegate.getJobUpdateDiff(request); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0e62780c/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java b/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java index 0547b1a..1415f0c 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/aop/MockDecoratedThrift.java @@ -26,8 +26,6 @@ import com.google.inject.Binder; import org.apache.aurora.gen.AuroraAdmin; import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift; -import uno.perk.forward.Forward; - /** * An injected forwarding thrift implementation that delegates to a bound mock interface. * <p> @@ -35,8 +33,7 @@ import uno.perk.forward.Forward; * https://code.google.com/p/google-guice/wiki/AOP#Limitations */ @DecoratedThrift -@Forward(AnnotatedAuroraAdmin.class) -public class MockDecoratedThrift extends MockDecoratedThriftForwarder { +public class MockDecoratedThrift extends ForwardingThrift { @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.PARAMETER, ElementType.METHOD})
