SLIDER-799 escalation process implemented; tests are TBD
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/6d955af0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/6d955af0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/6d955af0 Branch: refs/heads/feature/SLIDER-799-AM-managed-relax Commit: 6d955af0f4f6d02ec4f3080191a060464513f6b6 Parents: 559ed00 Author: Steve Loughran <[email protected]> Authored: Mon Mar 16 20:08:51 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Mon Mar 16 20:08:51 2015 +0000 ---------------------------------------------------------------------- .../org/apache/slider/api/InternalKeys.java | 12 +++- .../org/apache/slider/api/ResourceKeys.java | 2 +- .../providers/AbstractProviderService.java | 5 ++ .../server/appmaster/SliderAppMaster.java | 47 +++++++++++++--- .../actions/EscalateOutstandingRequests.java | 22 ++++++++ .../operations/AsyncRMOperationHandler.java | 8 ++- .../operations/CancelSingleRequest.java | 58 ++++++++++++++++++++ .../ProviderNotifyingOperationHandler.java | 5 ++ .../operations/RMOperationHandlerActions.java | 12 +++- .../slider/server/appmaster/state/AppState.java | 33 ++++++++--- .../server/appmaster/state/NodeInstance.java | 29 ++++------ .../appmaster/state/OutstandingRequest.java | 11 +++- .../state/OutstandingRequestTracker.java | 38 ++++++++++++- .../server/appmaster/state/RoleHistory.java | 34 ++++++++---- .../history/TestRoleHistoryNIComparators.groovy | 1 - .../model/mock/MockProviderService.groovy | 5 ++ .../model/mock/MockRMOperationHandler.groovy | 11 +++- 17 files changed, 280 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/api/InternalKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/InternalKeys.java b/slider-core/src/main/java/org/apache/slider/api/InternalKeys.java index b360fbe..38494a2 100644 --- a/slider-core/src/main/java/org/apache/slider/api/InternalKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/InternalKeys.java @@ -89,6 +89,7 @@ public interface InternalKeys { * Default short life threshold: {@value} */ int DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE = 60; + /** * Version of the app: {@value} */ @@ -177,5 +178,14 @@ public interface InternalKeys { * 100% for chaos values */ int PROBABILITY_PERCENT_100 = 100 * PROBABILITY_PERCENT_1; - + + /** + * interval between checks for escalation: {@value} + */ + String ESCALATION_CHECK_INTERVAL = "escalation.check.interval.seconds"; + + /** + * default value: {@value} + */ + int DEFAULT_ESCALATION_CHECK_INTERVAL = 30; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java index ce2a54f..94ce681 100644 --- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java @@ -145,7 +145,7 @@ public interface ResourceKeys { * Time in seconds to relax placement delay */ String PLACEMENT_RELAX_DELAY = - "yarn.placement.relax.delay.seconds"; + "yarn.placement.relax.seconds"; /** * Time to have a strict placement policy outstanding before http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java index fd7df73..7cba840 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java @@ -405,6 +405,11 @@ public abstract class AbstractProviderService } @Override + public void cancelSingleRequest(AMRMClient.ContainerRequest request) { + // no-op + } + + @Override public int cancelContainerRequests(Priority priority1, Priority priority2, int count) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 34bf20c..a93c60b 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -113,6 +113,7 @@ import org.apache.slider.providers.agent.AgentKeys; import org.apache.slider.providers.slideram.SliderAMClientProvider; import org.apache.slider.providers.slideram.SliderAMProviderService; import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance; +import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests; import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; import org.apache.slider.server.appmaster.actions.QueueExecutor; import org.apache.slider.server.appmaster.actions.QueueService; @@ -141,7 +142,6 @@ import org.apache.slider.server.appmaster.state.ProviderAppState; import org.apache.slider.server.appmaster.operations.RMOperationHandler; import org.apache.slider.server.appmaster.state.RoleInstance; import org.apache.slider.server.appmaster.state.RoleStatus; -import org.apache.slider.server.appmaster.state.SimpleReleaseSelector; import org.apache.slider.server.appmaster.web.AgentService; import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer; import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp; @@ -918,7 +918,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService initCompleted.set(true); scheduleFailureWindowResets(instanceDefinition.getResources()); - + scheduleEscalation(instanceDefinition.getInternal()); try { @@ -1664,13 +1664,28 @@ public class SliderAppMaster extends AbstractSliderLaunchedService log.info( "Scheduling the failure window reset interval to every {} seconds", seconds); - RenewingAction<ResetFailureWindow> renew = new RenewingAction<ResetFailureWindow>( + RenewingAction<ResetFailureWindow> renew = new RenewingAction<>( reset, seconds, seconds, TimeUnit.SECONDS, 0); actionQueues.renewing("failures", renew); } else { log.info("Failure window reset interval is not set"); } } + + /** + * Schedule the escalation action + * @param internal + * @throws BadConfigException + */ + private void scheduleEscalation(ConfTree internal) throws BadConfigException { + EscalateOutstandingRequests escalate = new EscalateOutstandingRequests(); + ConfTreeOperations ops = new ConfTreeOperations(internal); + int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL, + InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL); + RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>( + escalate, seconds, seconds, TimeUnit.SECONDS, 0); + actionQueues.renewing("escalation", renew); + } /** * Look at where the current node state is -and whether it should be changed @@ -1728,13 +1743,27 @@ public class SliderAppMaster extends AbstractSliderLaunchedService queue(new ActionStopSlider(e)); } } - + + /** + * Escalate operation as triggered by external timer. + * <p> + * Get the list of new operations off the AM, then executest them. + */ + public void escalateOutstandingRequests() { + List<AbstractRMOperation> operations = appState.escalateOutstandingRequests(); + providerRMOperationHandler.execute(operations); + execute(operations); + } + + /** * Shutdown operation: release all containers */ private void releaseAllContainers() { + List<AbstractRMOperation> operations = appState.releaseAllContainers(); + providerRMOperationHandler.execute(operations); //now apply the operations - execute(appState.releaseAllContainers()); + execute(operations); } /** @@ -1808,8 +1837,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService return rmOperationHandler.cancelContainerRequests(priority1, priority2, count); } - -/* =================================================================== */ + @Override + public void cancelSingleRequest(AMRMClient.ContainerRequest request) { + rmOperationHandler.cancelSingleRequest(request); + } + + /* =================================================================== */ /* END */ /* =================================================================== */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java new file mode 100644 index 0000000..e527be6 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +public class EscalateOutstandingRequests { +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java index 1cbb960..7c98551 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java @@ -78,7 +78,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler { break; } // a single release - client.removeContainerRequest(request); + cancelSingleRequest(request); remaining --; } } @@ -86,6 +86,12 @@ public class AsyncRMOperationHandler extends RMOperationHandler { } @Override + public void cancelSingleRequest(AMRMClient.ContainerRequest request) { + // a single release + client.removeContainerRequest(request); + } + + @Override public void releaseAssignedContainer(ContainerId containerId) { log.debug("Releasing container {}", containerId); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java new file mode 100644 index 0000000..e4ccb10 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.operations; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.slider.server.appmaster.state.ContainerPriority; + +/** + * Cancel a container request + */ +public class CancelSingleRequest extends AbstractRMOperation { + + private final Priority priority1; + private final Priority priority2; + private final int count; + + public CancelSingleRequest(Priority priority1, Priority priority2, int count) { + this.priority1 = priority1; + this.priority2 = priority2; + this.count = count; + } + + @Override + public void execute(RMOperationHandlerActions handler) { + handler.cancelContainerRequests(priority1, priority2, count); + } + + @Override + public String toString() { + return "release " + count + + " requests for " + ContainerPriority.toString(priority1) + + " and " + ContainerPriority.toString(priority2); + } + + /** + * Get the number to release + * @return the number of containers to release + */ + public int getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java index 66df566..184a36a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java @@ -47,4 +47,9 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler { int count) { return providerService.cancelContainerRequests(priority1, priority2, count); } + + @Override + public void cancelSingleRequest(AMRMClient.ContainerRequest request) { + providerService.cancelSingleRequest(request); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java index 97fde09..594ee47 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java @@ -27,9 +27,19 @@ import java.util.List; public interface RMOperationHandlerActions { void releaseAssignedContainer(ContainerId containerId); - void addContainerRequest(AMRMClient.ContainerRequest req); + /** + * Issue a container request + * @param request + */ + void addContainerRequest(AMRMClient.ContainerRequest request); /** + * Cancel a specific request + * @param request request to cancel + */ + void cancelSingleRequest(AMRMClient.ContainerRequest request); + + /** * Remove a container request * @param priority1 priority to remove at * @param priority2 second priority to target http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index f2c237c..a887107 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import static org.apache.slider.api.StateValues.*; import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterDescriptionKeys; import org.apache.slider.api.ClusterDescriptionOperations; @@ -88,8 +87,20 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.slider.api.ResourceKeys.*; -import static org.apache.slider.api.RoleKeys.*; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_CORES; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_LABEL_EXPRESSION; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_MEMORY; +import static org.apache.slider.api.ResourceKeys.YARN_CORES; +import static org.apache.slider.api.ResourceKeys.YARN_LABEL_EXPRESSION; +import static org.apache.slider.api.ResourceKeys.YARN_MEMORY; +import static org.apache.slider.api.RoleKeys.ROLE_FAILED_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_RELEASING_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_REQUESTED_INSTANCES; +import static org.apache.slider.api.StateValues.STATE_CREATED; +import static org.apache.slider.api.StateValues.STATE_DESTROYED; +import static org.apache.slider.api.StateValues.STATE_LIVE; +import static org.apache.slider.api.StateValues.STATE_SUBMITTED; /** @@ -704,7 +715,7 @@ public class AppState { clusterStatusTemplate = ClusterDescriptionOperations.buildFromInstanceDefinition( - instanceDefinition); + instanceDefinition); // Add the -site configuration properties @@ -1286,7 +1297,7 @@ public class AppState { int maxVal) { String val = resources.getComponentOpt(name, option, - Integer.toString(defVal)); + Integer.toString(defVal)); Integer intVal; if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) { intVal = maxVal; @@ -1856,11 +1867,19 @@ public class AppState { } roleHistory.resetFailedRecently(); } + + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public List<AbstractRMOperation> escalateOutstandingRequests() { + return roleHistory.escalateOutstandingRequests(); + } /** * Look at the allocation status of one role, and trigger add/release * actions if the number of desired role instances doesn't equal - * (actual+pending). + * (actual + pending). * <p> * MUST be executed from within a synchronized method * <p> @@ -2046,7 +2065,7 @@ public class AppState { Collection<RoleInstance> targets = cloneOwnedContainerList(); log.info("Releasing {} containers", targets.size()); List<AbstractRMOperation> operations = - new ArrayList<AbstractRMOperation>(targets.size()); + new ArrayList<>(targets.size()); for (RoleInstance instance : targets) { if (instance.roleId == SliderKeys.ROLE_AM_PRIORITY_INDEX) { // don't worry about the AM http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java index 71b74fc..68c8a15 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java @@ -41,7 +41,7 @@ public class NodeInstance { */ public NodeInstance(String hostname, int roles) { this.hostname = hostname; - nodeEntries = new ArrayList<NodeEntry>(roles); + nodeEntries = new ArrayList<>(roles); } /** @@ -103,12 +103,8 @@ public class NodeInstance { */ public boolean isConsideredUnreliable(int role, int threshold) { NodeEntry entry = get(role); - - if (entry != null) { - return entry.getFailedRecently() > threshold; - } else { - return false; - } + + return entry != null && entry.getFailedRecently() > threshold; } /** @@ -131,7 +127,8 @@ public class NodeInstance { /** - * run through each entry; gc'ing & removing old ones + * run through each entry; gc'ing & removing old ones that don't have + * a recent failure count (we care about those) * @param absoluteTime age in millis * @return true if there are still entries left */ @@ -140,7 +137,7 @@ public class NodeInstance { ListIterator<NodeEntry> entries = nodeEntries.listIterator(); while (entries.hasNext()) { NodeEntry entry = entries.next(); - if (entry.notUsedSince(absoluteTime)) { + if (entry.notUsedSince(absoluteTime) && entry.getFailedRecently() == 0) { entries.remove(); } else { active = true; @@ -203,13 +200,8 @@ public class NodeInstance { /** * A comparator for sorting entries where the node is preferred over another. - * If there's no entry for an element then it's failure count is set to -1, age to 0 - * for the purposes of the comparison - * <ol> - * <li>Entry exists => end of list as unknown</li> - * <li>Lowest failure count</li> - * <li>Age</li> - * </ol> + * <p> + * The exact algorithm may change * * @return +ve int if left is preferred to right; -ve if right over left, 0 for equal */ @@ -227,7 +219,8 @@ public class NodeInstance { NodeEntry left = o1.get(role); NodeEntry right = o2.get(role); - // sort by failure count first +/* + // sort by failure count int failL = left != null ? left.getFailedRecently() : -1; int failR = right != null ? right.getFailedRecently() : -1; @@ -237,7 +230,7 @@ public class NodeInstance { if (failR > failL) { return -1; } - + */ // failure counts are equal: compare age long ageL = left != null ? left.getLastUsed() : 0; long ageR = right != null ? right.getLastUsed() : 0; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index f6b83a7..7ad3fbb 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -187,7 +187,7 @@ public final class OutstandingRequest { * as the original one, and the same host, but: relaxed placement, and a changed priority * so as to place it into the relaxed list. */ - public AMRMClient.ContainerRequest buildEscalatedContainerRequest() { + public AMRMClient.ContainerRequest escalate() { escalated = true; Preconditions.checkNotNull(issuedRequest, "issued request"); Priority pri = ContainerPriority.createPriority(roleId, true); @@ -220,6 +220,15 @@ public final class OutstandingRequest { } /** + * Query to see if the request is ready to be escalated + * @param time time to check against + * @return true if escalation should begin + */ + public boolean shouldEscalate(long time) { + return !escalated && escalationTimeoutMillis < time; + } + + /** * Equality is on hostname and role * @param o other * @return true on a match http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index e197a86..eb58e74 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -19,6 +19,10 @@ package org.apache.slider.server.appmaster.state; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.operations.CancelSingleRequest; +import org.apache.slider.server.appmaster.operations.ContainerRequestOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +52,11 @@ public class OutstandingRequestTracker { protected static final Logger log = LoggerFactory.getLogger(OutstandingRequestTracker.class); + /** + * no requests; saves creating a new list if not needed + */ + private final List<AbstractRMOperation> NO_REQUESTS = new ArrayList<>(0); + private Map<OutstandingRequest, OutstandingRequest> placedRequests = new HashMap<>(); @@ -122,10 +131,10 @@ public class OutstandingRequestTracker { } /** - * Get the age of a container. If it is not known in the history, + * Get the age of a node hosting container. If it is not known in the history, * return 0. * @param c container - * @return age, null if + * @return age, null if there's no entry for it. */ private long getAgeOf(Container c) { long age = 0; @@ -224,4 +233,29 @@ public class OutstandingRequestTracker { public synchronized List<OutstandingRequest> listOutstandingRequests() { return new ArrayList<>(placedRequests.values()); } + + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public synchronized List<AbstractRMOperation> escalateOutstandingRequests(long now) { + if (placedRequests.isEmpty()) { + return NO_REQUESTS; + } + + List<AbstractRMOperation> operations = new ArrayList<>(); + for (OutstandingRequest outstandingRequest : placedRequests.values()) { + if (outstandingRequest.shouldEscalate(now)) { + + // time to escalate + CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest); + operations.add(cancel); + AMRMClient.ContainerRequest escalated = + outstandingRequest.escalate(); + operations.add(new ContainerRequestOperation(escalated)); + } + + } + return operations; + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index acfe606..0d4de2d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -18,17 +18,7 @@ package org.apache.slider.server.appmaster.state; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; @@ -38,12 +28,22 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.providers.ProviderRole; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; import org.apache.slider.server.avro.RoleHistoryHeader; import org.apache.slider.server.avro.RoleHistoryWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * The Role History. @@ -854,4 +854,14 @@ public class RoleHistory { return lst; } + + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public List<AbstractRMOperation> escalateOutstandingRequests() { + long now = now(); + return outstandingRequests.escalateOutstandingRequests(now); + + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy index 74dfd42..b26b2f0 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy @@ -63,7 +63,6 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { def preferred = new NodeInstance.Preferred(0) assert preferred.compare(age6failing, age1failing) == -1 assert preferred.compare(age1failing, age6failing) == 1 - assert preferred.compare(age1failing, age1failing) == 0 } @Test http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy index f8ab56d..44415f4 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy @@ -276,6 +276,11 @@ class MockProviderService implements ProviderService { } @Override + void cancelSingleRequest(AMRMClient.ContainerRequest request) { + + } + + @Override void execute(List<AbstractRMOperation> operations) { } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6d955af0/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy index 297c597..a68ce02 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy @@ -39,7 +39,7 @@ class MockRMOperationHandler extends RMOperationHandler { @Override public void releaseAssignedContainer(ContainerId containerId) { operations.add(new ContainerReleaseOperation(containerId)) - log.info("Releasing container ID " + containerId.getId()) + log.info("Releasing container ID " + containerId.containerId) releases++; } @@ -61,6 +61,15 @@ class MockRMOperationHandler extends RMOperationHandler { return releaseable; } + @Override + void cancelSingleRequest(AMRMClient.ContainerRequest request) { + // here assume that there is a copy of this request in the list + if (availableToCancel > 0) { + availableToCancel--; + cancelled++; + } + } + /** * clear the history */
