Repository: aurora Updated Branches: refs/heads/master d9acfeb76 -> aaadad7c5
Refactor ClusterState to more appropriate package, move binding to StateModule Browsing through the code and I noticed that if preemption is turned off, the `/state` endpoint will not work since `ClusterState` is not bound. I moved `ClusterState` and `ClusterStateImpl` to a more suitable package, and bind `ClusterState` in `StateModule` no matter what. Reviewed at https://reviews.apache.org/r/66074/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/aaadad7c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/aaadad7c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/aaadad7c Branch: refs/heads/master Commit: aaadad7c57b0cf060709fdc5fef4accd290886f9 Parents: d9acfeb Author: Jordan Ly <[email protected]> Authored: Sat Mar 17 22:00:21 2018 -0700 Committer: Jordan Ly <[email protected]> Committed: Sat Mar 17 22:00:21 2018 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 2 +- .../aurora/benchmark/StatusUpdateBenchmark.java | 2 +- .../org/apache/aurora/scheduler/http/State.java | 4 +- .../scheduler/preemptor/ClusterState.java | 34 ------------- .../scheduler/preemptor/ClusterStateImpl.java | 51 ------------------- .../preemptor/PendingTaskProcessor.java | 1 + .../scheduler/preemptor/PreemptorModule.java | 4 +- .../aurora/scheduler/state/ClusterState.java | 32 ++++++++++++ .../scheduler/state/ClusterStateImpl.java | 52 ++++++++++++++++++++ .../aurora/scheduler/state/StateModule.java | 3 ++ .../apache/aurora/scheduler/http/StateTest.java | 2 +- .../preemptor/ClusterStateImplTest.java | 1 + .../preemptor/PendingTaskProcessorTest.java | 1 + 13 files changed, 96 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 54b6ed9..1f9a576 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -69,12 +69,12 @@ import org.apache.aurora.scheduler.offers.OfferOrderBuilder; import org.apache.aurora.scheduler.offers.OfferSetImpl; import org.apache.aurora.scheduler.offers.OfferSettings; import org.apache.aurora.scheduler.preemptor.BiCache; -import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.preemptor.PreemptorModule; import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.scheduling.TaskScheduler; import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl; import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.ReservationDuration; +import org.apache.aurora.scheduler.state.ClusterStateImpl; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index 37374dc..1c6c40a 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -70,8 +70,8 @@ import org.apache.aurora.scheduler.mesos.ProtosConversion; import org.apache.aurora.scheduler.mesos.SchedulerDriverModule; import org.apache.aurora.scheduler.mesos.TestExecutorSettings; import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; +import org.apache.aurora.scheduler.state.ClusterStateImpl; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/http/State.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/State.java b/src/main/java/org/apache/aurora/scheduler/http/State.java index 6d1b400..5b3b7c5 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/State.java +++ b/src/main/java/org/apache/aurora/scheduler/http/State.java @@ -29,9 +29,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import org.apache.aurora.scheduler.preemptor.ClusterState; -import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.preemptor.PreemptionVictim; +import org.apache.aurora.scheduler.state.ClusterState; +import org.apache.aurora.scheduler.state.ClusterStateImpl; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static java.util.Objects.requireNonNull; http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java deleted file mode 100644 index ce3bc7e..0000000 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.preemptor; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Multimap; - -/** - * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster. - */ -@VisibleForTesting -public interface ClusterState { - - /** - * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are - * assigned to. - * <p> - * TODO(wfarner): Return a more minimal type than IAssignedTask here. - * - * @return Active tasks and their associated slave IDs. - */ - Multimap<String, PreemptionVictim> getSlavesToActiveTasks(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java deleted file mode 100644 index 5574e9b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.preemptor; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSetMultimap; -import com.google.common.collect.Multimap; -import com.google.common.eventbus.Subscribe; - -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; - -/** - * A cached view of cluster state, kept up to date by pubsub notifications. - */ -public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber { - - private final Multimap<String, PreemptionVictim> victims = HashMultimap.create(); - - @Override - public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() { - synchronized (victims) { - return ImmutableSetMultimap.copyOf(victims); - } - } - - @Subscribe - public void taskChangedState(TaskStateChange stateChange) { - synchronized (victims) { - String slaveId = stateChange.getTask().getAssignedTask().getSlaveId(); - PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask()); - if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) { - victims.put(slaveId, victim); - } else { - victims.remove(slaveId, victim); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java index 056e466..ef06471 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java @@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.state.ClusterState; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java index 4de5ef8..7618efc 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java @@ -42,6 +42,7 @@ import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.config.validators.PositiveNumber; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings; +import org.apache.aurora.scheduler.state.ClusterStateImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,9 +124,6 @@ public class PreemptorModule extends AbstractModule { bind(new TypeLiteral<Integer>() { }) .annotatedWith(PendingTaskProcessor.ReservationBatchSize.class) .toInstance(options.reservationMaxBatchSize); - bind(ClusterState.class).to(ClusterStateImpl.class); - bind(ClusterStateImpl.class).in(Singleton.class); - expose(ClusterStateImpl.class); for (Module module: MoreModules.instantiateAll(options.slotFinderModules, cliOptions)) { install(module); http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/state/ClusterState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/state/ClusterState.java new file mode 100644 index 0000000..527cfd6 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/state/ClusterState.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.state; + +import com.google.common.collect.Multimap; + +import org.apache.aurora.scheduler.preemptor.PreemptionVictim; + +/** + * The current state of scheduled tasks within the cluster. + */ +public interface ClusterState { + + /** + * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are + * assigned to. + * + * @return Active tasks and their associated slave IDs. + */ + Multimap<String, PreemptionVictim> getSlavesToActiveTasks(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/state/ClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/ClusterStateImpl.java b/src/main/java/org/apache/aurora/scheduler/state/ClusterStateImpl.java new file mode 100644 index 0000000..d804198 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/state/ClusterStateImpl.java @@ -0,0 +1,52 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.state; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSetMultimap; +import com.google.common.collect.Multimap; +import com.google.common.eventbus.Subscribe; + +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.preemptor.PreemptionVictim; + +/** + * A cached view of cluster state, kept up to date by pubsub notifications. + */ +public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber { + + private final Multimap<String, PreemptionVictim> victims = HashMultimap.create(); + + @Override + public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() { + synchronized (victims) { + return ImmutableSetMultimap.copyOf(victims); + } + } + + @Subscribe + public void taskChangedState(TaskStateChange stateChange) { + synchronized (victims) { + String slaveId = stateChange.getTask().getAssignedTask().getSlaveId(); + PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask()); + if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) { + victims.put(slaveId, victim); + } else { + victims.remove(slaveId, victim); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/main/java/org/apache/aurora/scheduler/state/StateModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java index 76c3277..0e0f90b 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java @@ -70,6 +70,9 @@ public class StateModule extends AbstractModule { PubsubEventModule.bindSubscriber(binder(), PartitionManager.class); bindMaintenanceController(binder()); + + bind(ClusterState.class).to(ClusterStateImpl.class); + bind(ClusterStateImpl.class).in(Singleton.class); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/test/java/org/apache/aurora/scheduler/http/StateTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/StateTest.java b/src/test/java/org/apache/aurora/scheduler/http/StateTest.java index 0685d6e..852c2f8 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/StateTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/StateTest.java @@ -29,8 +29,8 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.preemptor.PreemptionVictim; +import org.apache.aurora.scheduler.state.ClusterStateImpl; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java index 881bb20..8498721 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java @@ -22,6 +22,7 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.state.ClusterStateImpl; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/aurora/blob/aaadad7c/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java index 35e9348..ba775f4 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java @@ -40,6 +40,7 @@ import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.state.ClusterState; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobKey;
