SLIDER-965 RoleHistory to (carefully) share RoleStatus instances with AppState
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5b7f6dde Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5b7f6dde Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5b7f6dde Branch: refs/heads/feature/SLIDER-82-pass-3.1 Commit: 5b7f6dde5cde1856c6a870db024020619b7b0d94 Parents: 8f2786c Author: Steve Loughran <[email protected]> Authored: Fri Nov 6 17:57:59 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Fri Nov 6 17:57:59 2015 +0000 ---------------------------------------------------------------------- .../slider/core/conf/ConfTreeOperations.java | 9 ++ .../server/appmaster/management/LongGauge.java | 64 +++----- .../slider/server/appmaster/state/AppState.java | 66 ++++----- .../appmaster/state/ContainerAllocation.java | 50 ------- .../state/ContainerAllocationResults.java | 50 +++++++ .../state/ContainerReleaseSelector.java | 5 +- .../MostRecentContainerReleaseSelector.java | 3 +- .../state/OutstandingRequestTracker.java | 4 +- .../server/appmaster/state/RoleHistory.java | 76 +++++----- .../server/appmaster/state/RoleStatus.java | 147 ++++++++++--------- .../appmaster/state/SimpleReleaseSelector.java | 3 +- .../appmaster/web/view/ContainerStatsBlock.java | 3 +- .../appstate/TestMockAppStateAAPlacement.groovy | 42 +----- .../TestMockAppStateDynamicHistory.groovy | 16 +- .../TestMockAppStateDynamicRoles.groovy | 39 ++--- .../TestRoleHistoryContainerEvents.groovy | 3 +- ...stRoleHistoryFindNodesForNewInstances.groovy | 3 +- .../history/TestRoleHistoryNIComparators.groovy | 12 +- .../model/history/TestRoleHistoryRW.groovy | 29 ++-- .../history/TestRoleHistoryRWOrdering.groovy | 25 ++-- .../TestRoleHistoryRequestTracking.groovy | 3 +- .../model/mock/BaseMockAppStateTest.groovy | 93 ++++++++---- .../appmaster/model/mock/MockRoleHistory.groovy | 12 +- 23 files changed, 356 insertions(+), 401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/core/conf/ConfTreeOperations.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/conf/ConfTreeOperations.java b/slider-core/src/main/java/org/apache/slider/core/conf/ConfTreeOperations.java index bc116e7..58896ee 100644 --- a/slider-core/src/main/java/org/apache/slider/core/conf/ConfTreeOperations.java +++ b/slider-core/src/main/java/org/apache/slider/core/conf/ConfTreeOperations.java @@ -432,5 +432,14 @@ public class ConfTreeOperations { public void setComponentOpt(String role, String option, int val) { setComponentOpt(role, option, Integer.toString(val)); } + /** + * Set a long role option, creating the role if necessary + * @param role role name + * @param option option name + * @param val long value + */ + public void setComponentOpt(String role, String option, long val) { + setComponentOpt(role, option, Long.toString(val)); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java index 08f61ec..72a8805 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java @@ -24,69 +24,43 @@ import com.codahale.metrics.Metric; import java.util.concurrent.atomic.AtomicLong; /** - * This is a long which acts as a gauge + * This is a {@link AtomicLong} which acts as a metrics gauge: its state can be exposed as + * a management value. + * */ -public class LongGauge implements Metric, Gauge<Long> { - - private final AtomicLong value; +public class LongGauge extends AtomicLong implements Metric, Gauge<Long> { /** * Instantiate * @param val current value */ public LongGauge(long val) { - this.value = new AtomicLong(val); - } - - public LongGauge() { - this(0); + super(val); } /** - * Set to a new value. - * @param val value + * Instantiate with value 0 */ - public synchronized void set(long val) { - value.set(val); - } - - public void inc() { - inc(1); - } - - public void dec() { - dec(1); - } - - public synchronized void inc(int delta) { - set(value.get() + delta); - } - - public synchronized void dec(int delta) { - set(value.get() - delta); + public LongGauge() { + this(0); } - public long get() { - return value.get(); - } @Override public Long getValue() { return get(); } - @Override - public String toString() { - return value.toString(); - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return value.equals(obj); + /** + * Decrement to the floor of 0. + * There's checks to stop more than one thread being in this method at the time, but + * that doesn't stop other operations on the value + * @param delta delta + * @return the current value + */ + public synchronized long decToFloor(long delta) { + long newval = Math.max(0L, get() - delta); + set(newval); + return get(); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 29d5cde..c46177a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -554,7 +554,7 @@ public class AppState { // set up the role history - roleHistory = new RoleHistory(roleList); + roleHistory = new RoleHistory(roleStatusMap.values()); roleHistory.register(metricsAndMonitoring); roleHistory.onStart(binding.fs, binding.historyPath); // trigger first node update @@ -665,12 +665,9 @@ public class AppState { //snapshot all three sectons - resourcesSnapshot = - ConfTreeOperations.fromInstance(instanceDefinition.getResources()); - appConfSnapshot = - ConfTreeOperations.fromInstance(instanceDefinition.getAppConf()); - internalsSnapshot = - ConfTreeOperations.fromInstance(instanceDefinition.getInternal()); + resourcesSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getResources()); + appConfSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getAppConf()); + internalsSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getInternal()); //build a new aggregate from the snapshots instanceDefinitionSnapshot = new AggregateConf(resourcesSnapshot.confTree, appConfSnapshot.confTree, @@ -681,7 +678,7 @@ public class AppState { ClusterDescriptionOperations.buildFromInstanceDefinition( instanceDefinition); -// Add the -site configuration properties + // Add the -site configuration properties for (Map.Entry<String, String> prop : clientProperties.entrySet()) { clusterStatusTemplate.clientProperties.put(prop.getKey(), prop.getValue()); } @@ -694,6 +691,7 @@ public class AppState { * @return a list of any dynamically added provider roles * (purely for testing purposes) */ + @VisibleForTesting public synchronized List<ProviderRole> updateResourceDefinitions(ConfTree resources) throws BadConfigException, IOException { log.debug("Updating resources to {}", resources); @@ -733,10 +731,8 @@ public class AppState { // skip inflexible roles, e.g AM itself continue; } - int currentDesired = roleStatus.getDesired(); + long currentDesired = roleStatus.getDesired(); String role = roleStatus.getName(); - MapOperations comp = - resources.getComponent(role); int desiredInstanceCount = getDesiredInstanceCount(resources, role); if (desiredInstanceCount == 0) { log.info("Role {} has 0 instances specified", role); @@ -756,12 +752,11 @@ public class AppState { // this is a new value log.info("Adding new role {}", name); MapOperations component = resources.getComponent(name); - ProviderRole dynamicRole = createDynamicProviderRole(name, - component); + ProviderRole dynamicRole = createDynamicProviderRole(name, component); RoleStatus roleStatus = buildRole(dynamicRole); roleStatus.setDesired(getDesiredInstanceCount(resources, name)); log.info("New role {}", roleStatus); - roleHistory.addNewProviderRole(dynamicRole); + roleHistory.addNewRole(roleStatus); newRoles.add(dynamicRole); } } @@ -842,8 +837,7 @@ public class AppState { putOwnedContainer(containerId, am); // patch up the role status - RoleStatus roleStatus = roleStatusMap.get( - (SliderKeys.ROLE_AM_PRIORITY_INDEX)); + RoleStatus roleStatus = roleStatusMap.get(SliderKeys.ROLE_AM_PRIORITY_INDEX); roleStatus.setDesired(1); roleStatus.incActual(); roleStatus.incStarted(); @@ -905,7 +899,7 @@ public class AppState { */ public List<RoleStatus> cloneRoleStatusList() { Collection<RoleStatus> statuses = roleStatusMap.values(); - List<RoleStatus> statusList = new ArrayList<RoleStatus>(statuses.size()); + List<RoleStatus> statusList = new ArrayList<>(statuses.size()); try { for (RoleStatus status : statuses) { statusList.add((RoleStatus)(status.clone())); @@ -1481,9 +1475,9 @@ public class AppState { log.info("Container was queued for release : {}", containerId); Container container = containersBeingReleased.remove(containerId); RoleStatus roleStatus = lookupRoleStatus(container); - int releasing = roleStatus.decReleasing(); - int actual = roleStatus.decActual(); - int completedCount = roleStatus.incCompleted(); + long releasing = roleStatus.decReleasing(); + long actual = roleStatus.decActual(); + long completedCount = roleStatus.incCompleted(); log.info("decrementing role count for role {} to {}; releasing={}, completed={}", roleStatus.getName(), actual, @@ -1620,7 +1614,7 @@ public class AppState { */ public synchronized float getApplicationProgressPercentage() { float percentage; - int desired = 0; + long desired = 0; float actual = 0; for (RoleStatus role : getRoleStatusMap().values()) { desired += role.getDesired(); @@ -1866,8 +1860,8 @@ public class AppState { private List<AbstractRMOperation> reviewOneRole(RoleStatus role) throws SliderInternalStateException, TriggerClusterTeardownException { List<AbstractRMOperation> operations = new ArrayList<>(); - int delta; - int expected; + long delta; + long expected; String name = role.getName(); synchronized (role) { delta = role.getDelta(); @@ -1909,13 +1903,13 @@ public class AppState { // reduce the number expected (i.e. subtract the delta) // then pick some containers to kill - int excess = -delta; + long excess = -delta; // how many requests are outstanding - int outstandingRequests = role.getRequested(); + long outstandingRequests = role.getRequested(); if (outstandingRequests > 0) { // outstanding requests. - int toCancel = Math.min(outstandingRequests, excess); + int toCancel = (int)Math.min(outstandingRequests, excess); // Delegate to Role History @@ -1972,13 +1966,12 @@ public class AppState { // ask the release selector to sort the targets containersToRelease = containerReleaseSelector.sortCandidates( roleId, - containersToRelease, - excess); + containersToRelease); //crop to the excess List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease) - ? containersToRelease.subList(0, excess) + ? containersToRelease.subList(0, (int)excess) : containersToRelease; @@ -2085,9 +2078,8 @@ public class AppState { List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers); log.debug("onContainersAllocated(): Total containers allocated = {}", ordered.size()); for (Container container : ordered) { - String containerHostInfo = container.getNodeId().getHost() - + ":" + - container.getNodeId().getPort(); + final NodeId nodeId = container.getNodeId(); + String containerHostInfo = nodeId.getHost() + ":" + nodeId.getPort(); //get the role final ContainerId cid = container.getId(); final RoleStatus role = lookupRoleStatus(container); @@ -2097,11 +2089,11 @@ public class AppState { //inc allocated count -this may need to be dropped in a moment, // but us needed to update the logic below - final int allocated = role.incActual(); - final int desired = role.getDesired(); + final long allocated = role.incActual(); + final long desired = role.getDesired(); final String roleName = role.getName(); - final ContainerAllocation allocation = + final ContainerAllocationResults allocation = roleHistory.onContainerAllocated(container, desired, allocated); final ContainerAllocationOutcome outcome = allocation.outcome; @@ -2128,8 +2120,8 @@ public class AppState { " on {}:{},", roleName, cid, - container.getNodeId().getHost(), - container.getNodeId().getPort() + nodeId.getHost(), + nodeId.getPort() ); assignments.add(new ContainerAssignment(container, role, outcome)); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocation.java deleted file mode 100644 index 6bfe8ab..0000000 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocation.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.appmaster.state; - -import org.apache.slider.server.appmaster.operations.AbstractRMOperation; - -import java.util.ArrayList; -import java.util.List; - -/** - * This is just a tuple of the outcome of a container allocation - */ -public class ContainerAllocation { - - /** - * What was the outcome of this allocation: placed, escalated, ... - */ - public ContainerAllocationOutcome outcome; - - /** - * The outstanding request which originated this. - * This will be null if the outcome is {@link ContainerAllocationOutcome#Unallocated} - * as it wasn't expected. - */ - public OutstandingRequest origin; - - /** - * A possibly empty list of requests to add to the follow-up actions - */ - public List<AbstractRMOperation> operations = new ArrayList<>(0); - - public ContainerAllocation() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java new file mode 100644 index 0000000..e80639e --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is just a tuple of the outcome of a container allocation + */ +public class ContainerAllocationResults { + + /** + * What was the outcome of this allocation: placed, escalated, ... + */ + public ContainerAllocationOutcome outcome; + + /** + * The outstanding request which originated this. + * This will be null if the outcome is {@link ContainerAllocationOutcome#Unallocated} + * as it wasn't expected. + */ + public OutstandingRequest origin; + + /** + * A possibly empty list of requests to add to the follow-up actions + */ + public List<AbstractRMOperation> operations = new ArrayList<>(0); + + public ContainerAllocationResults() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java index 0cbc134..fafbada 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java @@ -30,9 +30,8 @@ public interface ContainerReleaseSelector { * Given a list of candidate containers, return a sorted version of the priority * in which they should be released. * @param candidates candidate list ... everything considered suitable - * @return + * @return the list of candidates */ List<RoleInstance> sortCandidates(int roleId, - List<RoleInstance> candidates, - int minimumToSelect); + List<RoleInstance> candidates); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java index 9d936a1..38c5b8e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java @@ -32,8 +32,7 @@ public class MostRecentContainerReleaseSelector implements ContainerReleaseSelec @Override public List<RoleInstance> sortCandidates(int roleId, - List<RoleInstance> candidates, - int minimumToSelect) { + List<RoleInstance> candidates) { Collections.sort(candidates, new newerThan()); return candidates; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index bf34d43..a791826 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -127,13 +127,13 @@ public class OutstandingRequestTracker { * @param hostname hostname * @return the allocation outcome */ - public synchronized ContainerAllocation onContainerAllocated(int role, + public synchronized ContainerAllocationResults onContainerAllocated(int role, String hostname, Container container) { final String containerDetails = SliderUtils.containerToString(container); log.debug("Processing allocation for role {} on {}", role, containerDetails); - ContainerAllocation allocation = new ContainerAllocation(); + ContainerAllocationResults allocation = new ContainerAllocationResults(); ContainerAllocationOutcome outcome; OutstandingRequest request = placedRequests.remove(new OutstandingRequest(role, hostname)); if (request != null) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 34340a2..c93c7f5 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -71,6 +71,8 @@ public class RoleHistory { protected static final Logger log = LoggerFactory.getLogger(RoleHistory.class); private final List<ProviderRole> providerRoles; + /** the roles in here are shared with App State */ + private final Map<Integer, RoleStatus> roleStatusMap = new HashMap<>(); private long startTime; /** Time when saved */ @@ -110,10 +112,17 @@ public class RoleHistory { */ private Set<String> failedNodes = new HashSet<>(); - - public RoleHistory(List<ProviderRole> providerRoles) throws BadConfigException { - this.providerRoles = providerRoles; - roleSize = providerRoles.size(); + /** + * Instantiate + * @param roles initial role list + * @throws BadConfigException + */ + public RoleHistory(Collection<RoleStatus> roles) throws BadConfigException { + roleSize = roles.size(); + providerRoles = new ArrayList<>(roleSize); + for (RoleStatus role : roles) { + addNewRole(role); + } reset(); } @@ -126,13 +135,7 @@ public class RoleHistory { nodemap = new NodeMap(roleSize); failedNodes = new HashSet<>(); resetAvailableNodeLists(); - outstandingRequests = new OutstandingRequestTracker(); - - Map<Integer, RoleStatus> roleStats = new HashMap<>(); - for (ProviderRole providerRole : providerRoles) { - checkProviderRole(roleStats, providerRole); - } } /** @@ -148,45 +151,33 @@ public class RoleHistory { } /** - * safety check: make sure the provider role is unique amongst + * safety check: make sure the role is unique amongst * the role stats...which is extended with the new role - * @param roleStats role stats - * @param providerRole role + * @param roleStatus role * @throws ArrayIndexOutOfBoundsException * @throws BadConfigException */ - protected void checkProviderRole(Map<Integer, RoleStatus> roleStats, - ProviderRole providerRole) - throws BadConfigException { - int index = providerRole.id; + protected void putRole(RoleStatus roleStatus) throws BadConfigException { + int index = roleStatus.getKey(); if (index < 0) { - throw new BadConfigException("Provider " + providerRole + " id is out of range"); + throw new BadConfigException("Provider " + roleStatus + " id is out of range"); } - if (roleStats.get(index) != null) { + if (roleStatusMap.get(index) != null) { throw new BadConfigException( - providerRole.toString() + " id duplicates that of " + - roleStats.get(index)); + roleStatus.toString() + " id duplicates that of " + + roleStatusMap.get(index)); } - roleStats.put(index, new RoleStatus(providerRole)); + roleStatusMap.put(index, roleStatus); } /** - * Add a new provider role to the map - * @param providerRole new provider role + * Add a new role + * @param roleStatus new role */ - public void addNewProviderRole(ProviderRole providerRole) - throws BadConfigException { - log.debug("Validating/adding new provider role to role history: {} ", - providerRole); - Map<Integer, RoleStatus> roleStats = new HashMap<>(); - - for (ProviderRole role : providerRoles) { - roleStats.put(role.id, new RoleStatus(role)); - } - - checkProviderRole(roleStats, providerRole); - log.debug("Check successful; adding role"); - this.providerRoles.add(providerRole); + public void addNewRole(RoleStatus roleStatus) throws BadConfigException { + log.debug("Validating/adding new role to role history: {} ", roleStatus); + putRole(roleStatus); + this.providerRoles.add(roleStatus.getProviderRole()); } /** @@ -713,13 +704,14 @@ public class RoleHistory { * @param actualCount current count of instances * @return The allocation outcome */ - public synchronized ContainerAllocation onContainerAllocated(Container container, - int desiredCount, - int actualCount) { + public synchronized ContainerAllocationResults onContainerAllocated(Container container, + long desiredCount, + long actualCount) { int role = ContainerPriority.extractRole(container); + String hostname = RoleHistoryUtils.hostnameOf(container); List<NodeInstance> nodeInstances = listRecentNodesForRoleId(role); - ContainerAllocation outcome = + ContainerAllocationResults outcome = outstandingRequests.onContainerAllocated(role, hostname, container); if (desiredCount <= actualCount) { // all outstanding requests have been satisfied @@ -732,7 +724,7 @@ public class RoleHistory { sortRecentNodeList(role); } } - // AA placement: now request a new node + // TODO: AA placement: now request a new node return outcome; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index 20f5802..4197c4f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -21,17 +21,20 @@ package org.apache.slider.server.appmaster.state; import org.apache.slider.api.types.ComponentInformation; import org.apache.slider.providers.PlacementPolicy; import org.apache.slider.providers.ProviderRole; +import org.apache.slider.server.appmaster.management.LongGauge; import java.io.Serializable; import java.util.Comparator; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** - * Models the ongoing status of all nodes in - * Nothing here is synchronized: grab the whole instance to update. + * Models the ongoing status of all nodes in an application. + * + * These structures are shared across the {@link AppState} and {@link RoleHistory} structures, + * and must be designed for synchronous access. Atomic counters are preferred to anything which + * requires synchronization. Where synchronized access is good is that it allows for + * the whole instance to be locked, for updating multiple entries. */ public final class RoleStatus implements Cloneable { @@ -43,13 +46,19 @@ public final class RoleStatus implements Cloneable { private final int key; private final ProviderRole providerRole; - private int desired, actual, requested, releasing; - private int failed, startFailed; - private int started, completed, totalRequested; - private final AtomicLong preempted = new AtomicLong(0); - private final AtomicLong nodeFailed = new AtomicLong(0); - private final AtomicLong failedRecently = new AtomicLong(0); - private final AtomicLong limitsExceeded = new AtomicLong(0); + private final LongGauge desired = new LongGauge(); + private final LongGauge actual = new LongGauge(); + private final LongGauge requested = new LongGauge(); + private final LongGauge releasing = new LongGauge(); + private final LongGauge failed = new LongGauge(); + private final LongGauge startFailed = new LongGauge(); + private final LongGauge started= new LongGauge(); + private final LongGauge completed = new LongGauge(); + private final LongGauge totalRequested = new LongGauge(); + private final LongGauge preempted = new LongGauge(0); + private final LongGauge nodeFailed = new LongGauge(0); + private final LongGauge failedRecently = new LongGauge(0); + private final LongGauge limitsExceeded = new LongGauge(0); /** flag set to true if there is an outstanding anti-affine request */ private final AtomicBoolean pendingAARequest = new AtomicBoolean(false); @@ -125,63 +134,61 @@ public final class RoleStatus implements Cloneable { return !hasPlacementPolicy(PlacementPolicy.NO_DATA_LOCALITY); } - public synchronized int getDesired() { - return desired; + public long getDesired() { + return desired.get(); } - public synchronized void setDesired(int desired) { - this.desired = desired; + public void setDesired(long desired) { + this.desired.set(desired); } - public synchronized int getActual() { - return actual; + public long getActual() { + return actual.get(); } - public synchronized int incActual() { - return ++actual; + public long incActual() { + return actual.incrementAndGet(); } - public synchronized int decActual() { - actual = Math.max(0, actual - 1); - return actual; + public long decActual() { + return actual.decToFloor(1); } - public synchronized int getRequested() { - return requested; + public long getRequested() { + return requested.get(); } - public synchronized int incRequested() { - totalRequested++; - return ++requested; + public long incRequested() { + totalRequested.incrementAndGet(); + return requested.incrementAndGet(); } - public synchronized int cancel(int count) { - requested = Math.max(0, requested - count); - return requested; + + public long cancel(long count) { + return requested.decToFloor(count); } - public synchronized int decRequested() { - return cancel(1); + public void decRequested() { + cancel(1); } - public synchronized int getReleasing() { - return releasing; + public long getReleasing() { + return releasing.get(); } - public synchronized int incReleasing() { - return ++releasing; + public long incReleasing() { + return releasing.incrementAndGet(); } - public synchronized int decReleasing() { - releasing = Math.max(0, releasing - 1); - return releasing; + public long decReleasing() { + return releasing.decToFloor(1); } - public synchronized int getFailed() { - return failed; + public long getFailed() { + return failed.get(); } - public synchronized long getFailedRecently() { + public long getFailedRecently() { return failedRecently.get(); } @@ -217,7 +224,7 @@ public final class RoleStatus implements Cloneable { case Node_failure: nodeFailed.incrementAndGet(); - failed++; + failed.incrementAndGet(); break; case Failed_limits_exceeded: // exceeded memory or CPU; app/configuration related @@ -225,7 +232,7 @@ public final class RoleStatus implements Cloneable { // fall through case Failed: // application failure, possibly node related, possibly not default: // anything else (future-proofing) - failed++; + failed.incrementAndGet(); failedRecently.incrementAndGet(); //have a look to see if it short lived if (startupFailure) { @@ -235,39 +242,39 @@ public final class RoleStatus implements Cloneable { } } - public synchronized int getStartFailed() { - return startFailed; + public long getStartFailed() { + return startFailed.get(); } public synchronized void incStartFailed() { - startFailed++; + startFailed.getAndIncrement(); } public synchronized String getFailureMessage() { return failureMessage; } - public synchronized int getCompleted() { - return completed; + public long getCompleted() { + return completed.get(); } public synchronized void setCompleted(int completed) { - this.completed = completed; + this.completed.set(completed); } - public synchronized int incCompleted() { - return completed ++; + public long incCompleted() { + return completed.incrementAndGet(); } - public synchronized int getStarted() { - return started; + public long getStarted() { + return started.get(); } public synchronized void incStarted() { - started++; + started.incrementAndGet(); } - public synchronized int getTotalRequested() { - return totalRequested; + public long getTotalRequested() { + return totalRequested.get(); } public long getPreempted() { @@ -284,13 +291,13 @@ public final class RoleStatus implements Cloneable { * @return the positive or negative number of roles to add/release. * 0 means "do nothing". */ - public synchronized int getDelta() { - int inuse = getActualAndRequested(); + public long getDelta() { + long inuse = getActualAndRequested(); //don't know how to view these. Are they in-use or not? - int delta = desired - inuse; + long delta = desired.get() - inuse; if (delta < 0) { //if we are releasing, remove the number that are already released. - delta += releasing; + delta += releasing.get(); //but never switch to a positive delta = Math.min(delta, 0); } @@ -301,8 +308,8 @@ public final class RoleStatus implements Cloneable { * Get count of actual and requested containers * @return the size of the application when outstanding requests are included */ - public synchronized int getActualAndRequested() { - return actual + requested; + public long getActualAndRequested() { + return actual.get() + requested.get(); } @Override @@ -357,15 +364,15 @@ public final class RoleStatus implements Cloneable { ComponentInformation info = new ComponentInformation(); info.name = name; info.priority = getPriority(); - info.desired = desired; - info.actual = actual; - info.requested = requested; - info.releasing = releasing; - info.failed = failed; - info.startFailed = startFailed; + info.desired = desired.intValue(); + info.actual = actual.intValue(); + info.requested = requested.intValue(); + info.releasing = releasing.intValue(); + info.failed = failed.intValue(); + info.startFailed = startFailed.intValue(); info.placementPolicy = getPlacementPolicy(); info.failureMessage = failureMessage; - info.totalRequested = totalRequested; + info.totalRequested = totalRequested.intValue(); info.failedRecently = failedRecently.intValue(); info.nodeFailed = nodeFailed.intValue(); info.preempted = preempted.intValue(); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java index b7f0e05..b848096 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java @@ -27,8 +27,7 @@ public class SimpleReleaseSelector implements ContainerReleaseSelector { @Override public List<RoleInstance> sortCandidates(int roleId, - List<RoleInstance> candidates, - int minimumToSelect) { + List<RoleInstance> candidates) { return candidates; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/main/java/org/apache/slider/server/appmaster/web/view/ContainerStatsBlock.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/view/ContainerStatsBlock.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/view/ContainerStatsBlock.java index 0896e2b..65d8b39 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/view/ContainerStatsBlock.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/view/ContainerStatsBlock.java @@ -51,6 +51,7 @@ public class ContainerStatsBlock extends HtmlBlock { // Some functions that help transform the data into an object we can use to abstract presentation specifics protected static final Function<Entry<String,Integer>,Entry<TableContent,Integer>> stringIntPairFunc = toTableContentFunction(); + protected static final Function<Entry<String,Long>,Entry<TableContent,Long>> stringLongPairFunc = toTableContentFunction(); protected static final Function<Entry<String,String>,Entry<TableContent,String>> stringStringPairFunc = toTableContentFunction(); private WebAppApi slider; @@ -108,7 +109,7 @@ public class ContainerStatsBlock extends HtmlBlock { DIV<Hamlet> div = html.div("role-info ui-widget-content ui-corner-all"); List<ClusterNode> nodesInRole = - new ArrayList<ClusterNode>(clusterNodesInRole.values()); + new ArrayList<>(clusterNodesInRole.values()); div.h2(BOLD, StringUtils.capitalize(name)); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index 810affc..157870a 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -55,41 +55,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest }*/ /** - * Get the container request of an indexed entry. Includes some assertions for better diagnostics - * @param ops operation list - * @param index index in the list - * @return the request. - */ - AMRMClient.ContainerRequest getRequest(List<AbstractRMOperation> ops, int index) { - assert index < ops.size() - def op = ops[index] - assert op instanceof ContainerRequestOperation - ((ContainerRequestOperation) op).request - } - - /** - * Get the cancel request of an indexed entry. Includes some assertions for better diagnostics - * @param ops operation list - * @param index index in the list - * @return the request. - */ - AMRMClient.ContainerRequest getCancel(List<AbstractRMOperation> ops, int index) { - assert index < ops.size() - def op = ops[index] - assert op instanceof CancelSingleRequest - ((CancelSingleRequest) op).request - } - - /** - * Get the single request of a list of operations; includes the check for the size - * @param ops operations list of size 1 - * @return the request within the first ContainerRequestOperation - */ - public AMRMClient.ContainerRequest getSingleRequest(List<AbstractRMOperation> ops) { - assert 1 == ops.size() - getRequest(ops, 0) - } - /** * Get the single request of a list of operations; includes the check for the size * @param ops operations list of size 1 * @return the request within the first operation @@ -123,10 +88,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest Container allocated = engine.allocateContainer(request) - // node is allocated wherever - - def firstAllocation = allocated.nodeId - // notify the container ane expect List<ContainerAssignment> assignments = []; List<AbstractRMOperation> operations = [] @@ -142,9 +103,10 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest // we also expect a new allocation request to have been issued def req2 = getRequest(operations, 1) - // now the nodes should be a list Container allocated2 = engine.allocateContainer(req2) + // placement must be on a different host + assert allocated2.nodeId != allocated.nodeId ContainerAssignment assigned = assignments[0] Container container = assigned.container http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicHistory.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicHistory.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicHistory.groovy index c62eb72..e57f341 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicHistory.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicHistory.groovy @@ -20,7 +20,6 @@ package org.apache.slider.server.appmaster.model.appstate import groovy.transform.CompileStatic import groovy.util.logging.Slf4j -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.slider.api.ResourceKeys import org.apache.slider.core.conf.ConfTreeOperations @@ -28,16 +27,14 @@ import org.apache.slider.core.exceptions.BadConfigException import org.apache.slider.providers.PlacementPolicy import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest -import org.apache.slider.server.appmaster.model.mock.MockAppState import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.model.mock.MockRoles import org.apache.slider.server.appmaster.model.mock.MockYarnEngine import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.state.AppState -import org.apache.slider.server.appmaster.state.AppStateBindingInfo import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.RoleInstance -import org.apache.slider.server.appmaster.state.SimpleReleaseSelector +import org.apache.slider.server.appmaster.state.RoleStatus import org.junit.Test /** @@ -191,21 +188,16 @@ class TestMockAppStateDynamicHistory extends BaseMockAppStateTest @Test(expected = BadConfigException.class) public void testRoleHistoryRoleAdditions() throws Throwable { MockRoleHistory roleHistory = new MockRoleHistory([]) - roleHistory.addNewProviderRole(new ProviderRole("one", 1)) - roleHistory.addNewProviderRole(new ProviderRole("two", 1)) + roleHistory.addNewRole(new RoleStatus(new ProviderRole("one", 1))) + roleHistory.addNewRole(new RoleStatus(new ProviderRole("two", 1))) roleHistory.dump() - fail("should have raised an exception") } - - + @Test(expected = BadConfigException.class) public void testRoleHistoryRoleStartupConflict() throws Throwable { MockRoleHistory roleHistory = new MockRoleHistory([ new ProviderRole("one", 1), new ProviderRole("two", 1) ]) roleHistory.dump() - fail("should have raised an exception") } - - } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicRoles.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicRoles.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicRoles.groovy index 05b38ab..d0163d2 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicRoles.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDynamicRoles.groovy @@ -98,21 +98,16 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest * @param actions source list * @return found list */ - List<ContainerRequestOperation> findAllocationsForRole(int role, + Collection<ContainerRequestOperation> findAllocationsForRole(int role, List<AbstractRMOperation> actions) { - List <ContainerRequestOperation > results = [] - actions.each { AbstractRMOperation operation -> - if (operation instanceof ContainerRequestOperation) { - def req = (ContainerRequestOperation) operation; - def reqrole = ContainerPriority.extractRole(req.request.priority) - if (role == reqrole) { - results << req - } - } + def requests = actions.findAll { + it instanceof ContainerRequestOperation}.collect {it as ContainerRequestOperation} + + requests.findAll { + role == ContainerPriority.extractRole(it.request.priority) } - return results - } - + } + @Test public void testStrictPlacementInitialRequest() throws Throwable { log.info("Initial engine state = $engine") @@ -124,7 +119,6 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest assertRelaxLocalityFlag(ID5, null, true, actions) } - @Test public void testPolicyPropagation() throws Throwable { assert !(appState.lookupRoleStatus(ROLE4).placementPolicy & PlacementPolicy.STRICT) @@ -136,7 +130,6 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest public void testNodeFailureThresholdPropagation() throws Throwable { assert (appState.lookupRoleStatus(ROLE4).nodeFailureThreshold == 3) assert (appState.lookupRoleStatus(ROLE5).nodeFailureThreshold == 2) - } @Test @@ -156,7 +149,6 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest assert instanceA def hostname = RoleHistoryUtils.hostnameOf(instanceA.container) - log.info("Allocated engine state = $engine") assert engine.containerCount() == 1 @@ -166,8 +158,7 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest role4.desired = 0 appState.lookupRoleStatus(ROLE4).desired = 0 def completionResults = [] - def containersToRelease = [] - instances = createStartAndStopNodes(completionResults) + createStartAndStopNodes(completionResults) assert engine.containerCount() == 0 assert completionResults.size() == 1 @@ -198,19 +189,16 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest } assert instanceA def hostname = RoleHistoryUtils.hostnameOf(instanceA.container) - - log.info("Allocated engine state = $engine") assert engine.containerCount() == 1 assert role5.actual == 1 - // shrinking cluster + // shrinking cluster role5.desired = 0 def completionResults = [] - def containersToRelease = [] - instances = createStartAndStopNodes(completionResults) + createStartAndStopNodes(completionResults) assert engine.containerCount() == 0 assert completionResults.size() == 1 assert role5.actual == 0 @@ -223,16 +211,15 @@ class TestMockAppStateDynamicRoles extends BaseMockAppStateTest def nodes = cro.request.nodes assert nodes.size() == 1 assert hostname == nodes[0] - } public void assertRelaxLocalityFlag( - int id, + int role, String expectedHost, boolean expectedRelaxFlag, List<AbstractRMOperation> actions) { def requests - requests = findAllocationsForRole(id, actions) + requests = findAllocationsForRole(role, actions) assert requests.size() == 1 def req = requests[0] assert expectedRelaxFlag == req.request.relaxLocality http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy index 8ab63aa..5609682 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy @@ -33,6 +33,7 @@ import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockContainer import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.model.mock.MockNodeId +import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.* import org.junit.Before import org.junit.Test @@ -60,7 +61,7 @@ class TestRoleHistoryContainerEvents extends BaseMockAppStateTest { String roleName = "test" List<NodeInstance> nodes = [age2Active2, age2Active0, age4Active1, age1Active4, age3Active0] - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) Resource resource http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy index c4768ec..63aa6d2 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy @@ -23,6 +23,7 @@ import groovy.util.logging.Slf4j import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory +import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.ContainerOutcome import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.RoleHistory @@ -54,7 +55,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest { NodeInstance empty = new NodeInstance("empty", MockFactory.ROLE_COUNT) List<NodeInstance> nodes = [age2Active2, age2Active0, age4Active1, age1Active4, age3Active0] - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) String roleName = "test" RoleStatus roleStat = new RoleStatus(new ProviderRole(roleName, 0)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy index ee910e4..bcd8f9f 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy @@ -40,6 +40,7 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { NodeInstance age1failing = nodeInstance(1001, 0, 0, 0) List<NodeInstance> nodes = [age2Active2, age4Active1, age1Active4, age3Active0] + List<NodeInstance> nodesPlusEmpty = [age2Active2, age4Active1, age1Active4, age3Active0, empty] List<NodeInstance> allnodes = [age6failing, age2Active2, age4Active1, age1Active4, age3Active0, age1failing] @Before @@ -80,9 +81,8 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { @Test public void testNewerThanNoRole() throws Throwable { - nodes << empty - Collections.sort(nodes, new NodeInstance.Preferred(0)) - assertListEquals(nodes, [age4Active1, age3Active0, age2Active2, age1Active4, empty]) + Collections.sort(nodesPlusEmpty, new NodeInstance.Preferred(0)) + assertListEquals(nodesPlusEmpty, [age4Active1, age3Active0, age2Active2, age1Active4, empty]) } @Test @@ -94,9 +94,9 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { @Test public void testMoreActiveThanEmpty() throws Throwable { - nodes << empty - Collections.sort(nodes, new NodeInstance.MoreActiveThan(0)) - assertListEquals(nodes, [age1Active4, age2Active2, age4Active1, age3Active0, empty]) + + Collections.sort(nodesPlusEmpty, new NodeInstance.MoreActiveThan(0)) + assertListEquals(nodesPlusEmpty, [age1Active4, age2Active2, age4Active1, age3Active0, empty]) } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRW.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRW.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRW.groovy index 254c0b6..72e4240 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRW.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRW.groovy @@ -27,6 +27,7 @@ import org.apache.slider.providers.PlacementPolicy import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory +import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.RoleHistory @@ -57,7 +58,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { @Test public void testWriteReadEmpty() throws Throwable { - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) roleHistory.onStart(fs, historyPath) Path history = roleHistory.saveHistory(time++) assert fs.isFile(history) @@ -67,7 +68,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { @Test public void testWriteReadData() throws Throwable { - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) assert !roleHistory.onStart(fs, historyPath) String addr = "localhost" NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr) @@ -77,7 +78,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { Path history = roleHistory.saveHistory(time++) assert fs.isFile(history) RoleHistoryWriter historyWriter = new RoleHistoryWriter(); - RoleHistory rh2 = new RoleHistory(MockFactory.ROLES) + RoleHistory rh2 = new MockRoleHistory(MockFactory.ROLES) def loadedRoleHistory = historyWriter.read(fs, history) @@ -92,7 +93,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { @Test public void testWriteReadActiveData() throws Throwable { - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) roleHistory.onStart(fs, historyPath) String addr = "localhost" String addr2 = "rack1server5" @@ -117,7 +118,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { describe("Loaded") log.info("testWriteReadActiveData in $history") RoleHistoryWriter historyWriter = new RoleHistoryWriter(); - RoleHistory rh2 = new RoleHistory(MockFactory.ROLES) + RoleHistory rh2 = new MockRoleHistory(MockFactory.ROLES) def loadedRoleHistory = historyWriter.read(fs, history) assert 3 == loadedRoleHistory.size() rh2.rebuild(loadedRoleHistory) @@ -154,7 +155,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { @Test public void testWriteThaw() throws Throwable { - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) assert !roleHistory.onStart(fs, historyPath) String addr = "localhost" NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr) @@ -164,7 +165,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { Path history = roleHistory.saveHistory(time++) long savetime =roleHistory.saveTime; assert fs.isFile(history) - RoleHistory rh2 = new RoleHistory(MockFactory.ROLES) + RoleHistory rh2 = new MockRoleHistory(MockFactory.ROLES) assert rh2.onStart(fs, historyPath) NodeInstance ni2 = rh2.getExistingNodeInstance(addr) assert ni2 != null @@ -211,7 +212,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { @Test public void testSkipEmptyFileOnRead() throws Throwable { describe "verify that empty histories are skipped on read; old histories purged" - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) roleHistory.onStart(fs, historyPath) time = 0 Path oldhistory = roleHistory.saveHistory(time++) @@ -226,7 +227,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { RoleHistoryWriter historyWriter = new RoleHistoryWriter(); Path touched = touch(historyWriter, time++) - RoleHistory rh2 = new RoleHistory(MockFactory.ROLES) + RoleHistory rh2 = new MockRoleHistory(MockFactory.ROLES) assert rh2.onStart(fs, historyPath) NodeInstance ni2 = rh2.getExistingNodeInstance(addr) assert ni2 != null @@ -240,7 +241,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { @Test public void testSkipBrokenFileOnRead() throws Throwable { describe "verify that empty histories are skipped on read; old histories purged" - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) roleHistory.onStart(fs, historyPath) time = 0 Path oldhistory = roleHistory.saveHistory(time++) @@ -258,7 +259,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { out.writeBytes("{broken:true}") out.close() - RoleHistory rh2 = new RoleHistory(MockFactory.ROLES) + RoleHistory rh2 = new MockRoleHistory(MockFactory.ROLES) describe("IGNORE STACK TRACE BELOW") assert rh2.onStart(fs, historyPath) @@ -285,7 +286,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { def loadedRoleHistory = writer.read(source) assert 4 == loadedRoleHistory.size() - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) assert 0 == roleHistory.rebuild(loadedRoleHistory) } @@ -300,7 +301,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { def loadedRoleHistory = writer.read(source) assert 6 == loadedRoleHistory.size() - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) assert 3 == roleHistory.rebuild(loadedRoleHistory) } @@ -318,7 +319,7 @@ class TestRoleHistoryRW extends BaseMockAppStateTest { assert 4 == loadedRoleHistory.size() def expandedRoles = new ArrayList(MockFactory.ROLES) expandedRoles << PROVIDER_ROLE3 - RoleHistory roleHistory = new RoleHistory(expandedRoles) + RoleHistory roleHistory = new MockRoleHistory(expandedRoles) assert 0 == roleHistory.rebuild(loadedRoleHistory) } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy index 0655531..8a0c1ca 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.slider.common.SliderKeys import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory +import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.RoleHistory @@ -40,10 +41,10 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest { List<Path> paths = pathlist( [ - "hdfs://localhost/history-0406c.json", - "hdfs://localhost/history-5fffa.json", - "hdfs://localhost/history-0001a.json", - "hdfs://localhost/history-0001f.json", + "hdfs://localhost/history-0406c.json", + "hdfs://localhost/history-5fffa.json", + "hdfs://localhost/history-0001a.json", + "hdfs://localhost/history-0001f.json", ] ) Path h_0406c = paths[0] @@ -52,9 +53,7 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest { List<Path> pathlist(List<String> pathnames) { - def result = [] - pathnames.each { result << new Path(new URI(it as String)) } - result + pathnames.collect{ new Path(new URI(it as String)) } } @Override @@ -85,7 +84,7 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest { describe "test that if multiple entries are written, the newest is picked up" long time = System.currentTimeMillis(); - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) assert !roleHistory.onStart(fs, historyPath) String addr = "localhost" NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr) @@ -94,7 +93,7 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest { Path history1 = roleHistory.saveHistory(time++) Path history2 = roleHistory.saveHistory(time++) - Path history3 = roleHistory.saveHistory(time++) + Path history3 = roleHistory.saveHistory(time) //inject a later file with a different name sliderFileSystem.cat(new Path(historyPath, "file.json"), true, "hello, world") @@ -137,10 +136,10 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest { RoleHistoryWriter.sortHistoryPaths(paths2) assertListEquals(paths2, [ - paths[1], - paths[0], - paths[3], - paths[2] + paths[1], + paths[0], + paths[3], + paths[2] ]) } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy index c6dcb07..693ea9f 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy @@ -28,6 +28,7 @@ import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockContainer import org.apache.slider.server.appmaster.model.mock.MockFactory +import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.ContainerAllocationOutcome import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance @@ -55,7 +56,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { NodeInstance empty = new NodeInstance("empty", MockFactory.ROLE_COUNT) List<NodeInstance> nodes = [age2Active2, age2Active0, age4Active1, age1Active4, age3Active0] - RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES) + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) /** 1MB, 1 vcore*/ Resource resource = Resource.newInstance(1, 1) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index 44d35be..cefba42 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -27,12 +27,15 @@ import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.hadoop.yarn.api.records.ContainerState import org.apache.hadoop.yarn.api.records.ContainerStatus import org.apache.hadoop.yarn.api.records.NodeReport +import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.slider.common.tools.SliderFileSystem import org.apache.slider.common.tools.SliderUtils import org.apache.slider.core.conf.AggregateConf import org.apache.slider.core.main.LauncherExitCodes import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.operations.CancelSingleRequest +import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.state.AppState import org.apache.slider.server.appmaster.state.AppStateBindingInfo import org.apache.slider.server.appmaster.state.ContainerAssignment @@ -57,15 +60,6 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles protected MockApplicationId applicationId; protected MockApplicationAttemptId applicationAttemptId; - @Override - void setup() { - super.setup() - YarnConfiguration conf = SliderUtils.createConfiguration() - fs = HadoopFS.get(new URI("file:///"), conf) - sliderFileSystem = new SliderFileSystem(fs, conf) - engine = createYarnEngine() - } - /** * Override point: called in setup() to create the YARN engine; can * be changed for different sizes and options @@ -75,13 +69,22 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles return new MockYarnEngine(64, 1) } + + @Override + void setup() { + super.setup() + YarnConfiguration conf = SliderUtils.createConfiguration() + fs = HadoopFS.get(new URI("file:///"), conf) + sliderFileSystem = new SliderFileSystem(fs, conf) + engine = createYarnEngine() + initApp() + } + /** * Initialize the application. * This uses the binding information supplied by {@link #buildBindingInfo()}. */ - @Before void initApp(){ - String historyDirName = testName; YarnConfiguration conf = SliderUtils.createConfiguration() applicationId = new MockApplicationId(id: 1, clusterTimestamp: 0) @@ -291,27 +294,27 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles /** * Process the RM operations and send <code>onContainersAllocated</code> * events to the app state - * @param ops - * @param released - * @return + * @param operationsIn list of incoming ops + * @param released released containers + * @return list of outbound operations */ public List<RoleInstance> submitOperations( - List<AbstractRMOperation> ops, - List<ContainerId> released) { - List<Container> allocatedContainers = engine.execute(ops, released) + List<AbstractRMOperation> operationsIn, + List<ContainerId> released, + List<AbstractRMOperation> operationsOut = []) { + List<Container> allocatedContainers = engine.execute(operationsIn, released) List<ContainerAssignment> assignments = []; - List<AbstractRMOperation> operations = [] - appState.onContainersAllocated(allocatedContainers, assignments, operations) - List<RoleInstance> instances = [] - for (ContainerAssignment assigned : assignments) { + appState.onContainersAllocated(allocatedContainers, assignments, operationsOut) + + assignments.collect { + ContainerAssignment assigned -> Container container = assigned.container RoleInstance ri = roleInstance(assigned) - instances << ri //tell the app it arrived log.debug("Start submitted ${ri.role} on ${container.id} ") appState.containerStartSubmitted(container, ri); + ri } - return instances } /** @@ -334,13 +337,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles List<ContainerId> extractContainerIds( List<RoleInstance> instances, int role) { - List<ContainerId> cids = [] - instances.each { RoleInstance instance -> - if (instance.roleId == role) { - cids << instance.id - } - } - return cids + instances.findAll { it.roleId == role }.collect { RoleInstance instance -> instance.id } } /** @@ -363,4 +360,40 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles def recordAllFailed(int id, int count, List<NodeInstance> nodes) { nodes.each { NodeInstance node -> recordAsFailed(node, id, count)} } + + /** + * Get the container request of an indexed entry. Includes some assertions for better diagnostics + * @param ops operation list + * @param index index in the list + * @return the request. + */ + AMRMClient.ContainerRequest getRequest(List<AbstractRMOperation> ops, int index) { + assert index < ops.size() + def op = ops[index] + assert op instanceof ContainerRequestOperation + ((ContainerRequestOperation) op).request + } + + /** + * Get the cancel request of an indexed entry. Includes some assertions for better diagnostics + * @param ops operation list + * @param index index in the list + * @return the request. + */ + AMRMClient.ContainerRequest getCancel(List<AbstractRMOperation> ops, int index) { + assert index < ops.size() + def op = ops[index] + assert op instanceof CancelSingleRequest + ((CancelSingleRequest) op).request + } + + /** + * Get the single request of a list of operations; includes the check for the size + * @param ops operations list of size 1 + * @return the request within the first ContainerRequestOperation + */ + public AMRMClient.ContainerRequest getSingleRequest(List<AbstractRMOperation> ops) { + assert 1 == ops.size() + getRequest(ops, 0) + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5b7f6dde/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRoleHistory.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRoleHistory.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRoleHistory.groovy index c521697..0a68afb 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRoleHistory.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRoleHistory.groovy @@ -21,15 +21,21 @@ package org.apache.slider.server.appmaster.model.mock import org.apache.slider.core.exceptions.BadConfigException import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.state.RoleHistory +import org.apache.slider.server.appmaster.state.RoleStatus /** * subclass to enable access to some of the protected methods */ class MockRoleHistory extends RoleHistory { + /** + * Take a list of provider roles and build the history from them, dynamically creating + * the role status entries on the way + * @param providerRoles provider role list + * @throws BadConfigException configuration problem with the role list + */ MockRoleHistory(List<ProviderRole> providerRoles) throws BadConfigException { - super(providerRoles) + super(providerRoles.collect { new RoleStatus(it) }) } - - + }
