SLIDER-988 add mock test of failure of AA container and re-request; fix any failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/830864ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/830864ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/830864ff Branch: refs/heads/feature/SLIDER-82-pass-3.1 Commit: 830864ff701a7d1682c39def20dda909c3b4e7e5 Parents: 591ba99 Author: Steve Loughran <[email protected]> Authored: Tue Nov 17 19:55:59 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Tue Nov 17 19:55:59 2015 +0000 ---------------------------------------------------------------------- .../apache/slider/common/tools/SliderUtils.java | 15 ++- .../management/BoolMetricPredicate.java | 44 ++++++++ .../management/LongMetricFunction.java | 44 ++++++++ .../management/MetricsAndMonitoring.java | 51 ++++++++- .../management/MetricsBindingService.java | 12 +- .../appmaster/management/MetricsConstants.java | 2 + .../management/PrefixedMetricsSet.java | 53 +++++++++ .../slider/server/appmaster/state/AppState.java | 69 +++++++----- .../appmaster/state/OutstandingRequest.java | 6 +- .../server/appmaster/state/RoleStatus.java | 82 ++++++++++---- .../TestMockAppStateAAOvercapacity.groovy | 110 +++++++++++++++++++ .../appstate/TestMockAppStateAAPlacement.groovy | 11 +- .../TestRoleHistoryContainerEvents.groovy | 4 +- .../model/mock/BaseMockAppStateTest.groovy | 14 --- 14 files changed, 438 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java b/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java index 5bf8622..eb7a9d5 100644 --- a/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java +++ b/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java @@ -155,6 +155,7 @@ public final class SliderUtils { * name of docker program */ public static final String DOCKER = "docker"; + public static final int NODE_LIST_LIMIT = 10; private SliderUtils() { } @@ -2468,7 +2469,7 @@ public final class SliderUtils { * @return +ve if x is less than y, -ve if y is greater than x; 0 for equality */ public static int compareTwoLongsReverse(long x, long y) { - return (x < y) ? +1 : ((x == y) ? 0 : -1); + return (x < y) ? 1 : ((x == y) ? 0 : -1); } public static String getSystemEnv(String property) { @@ -2490,9 +2491,15 @@ public final class SliderUtils { } List<String> nodes = request.getNodes(); if (nodes != null) { - buffer.append("Nodes = [") - .append(join(nodes, ", ", false)) - .append("]; "); + buffer.append("Nodes = [ "); + int size = nodes.size(); + for (int i = 0; i < Math.min(NODE_LIST_LIMIT, size); i++) { + buffer.append(nodes.get(i)).append(' '); + } + if (size > NODE_LIST_LIMIT) { + buffer.append(String.format("...(total %d entries)", size)); + } + buffer.append("]; "); } List<String> racks = request.getRacks(); if (racks != null) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java new file mode 100644 index 0000000..82bcd3a --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +/** + * A metric which takes a predicate and returns 1 if the predicate evaluates + * to true. The predicate is evaluated whenever the metric is read. + */ +public class BoolMetricPredicate implements Metric, Gauge<Integer> { + + private final Eval predicate; + + public BoolMetricPredicate(Eval predicate) { + this.predicate = predicate; + } + + @Override + public Integer getValue() { + return predicate.eval() ? 1: 0; + } + + public interface Eval { + boolean eval(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java new file mode 100644 index 0000000..1de7345 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +/** + * A metric which takes a function to generate a long value. + * The function is evaluated whenever the metric is read. + */ +public class LongMetricFunction implements Metric, Gauge<Long> { + + private final Eval function; + + public LongMetricFunction(Eval function) { + this.function = function; + } + + @Override + public Long getValue() { + return function.eval(); + } + + public interface Eval { + long eval(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java index cced42a..37a8935 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java @@ -20,9 +20,12 @@ package org.apache.slider.server.appmaster.management; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; import com.codahale.metrics.health.HealthCheckRegistry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -33,7 +36,8 @@ import java.util.concurrent.ConcurrentHashMap; * Class for all metrics and monitoring */ public class MetricsAndMonitoring extends CompositeService { - + protected static final Logger log = + LoggerFactory.getLogger(MetricsAndMonitoring.class); public MetricsAndMonitoring(String name) { super(name); } @@ -52,13 +56,15 @@ public class MetricsAndMonitoring extends CompositeService { private final Map<String, MeterAndCounter> meterAndCounterMap = new ConcurrentHashMap<>(); + private final List<MetricSet> metricSets = new ArrayList<>(); + /** * List of recorded events */ private final List<RecordedEvent> eventHistory = new ArrayList<>(100); public static final int EVENT_LIMIT = 1000; - + public MetricRegistry getMetrics() { return metrics; } @@ -74,6 +80,14 @@ public class MetricsAndMonitoring extends CompositeService { super.serviceInit(conf); } + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + for (MetricSet set : metricSets) { + unregister(set); + } + } + public MeterAndCounter getMeterAndCounter(String name) { return meterAndCounterMap.get(name); } @@ -144,5 +158,38 @@ public class MetricsAndMonitoring extends CompositeService { public synchronized List<RecordedEvent> cloneEventHistory() { return new ArrayList<>(eventHistory); } + + /** + * Add a metric set for registering and deregistration on service stop + * @param metricSet metric set + */ + public void addMetricSet(MetricSet metricSet) { + metricSets.add(metricSet); + metrics.registerAll(metricSet); + } + + /** + * add a metric set, giving each entry a prefix + * @param prefix prefix (a trailing "." is automatically added) + * @param metricSet the metric set to register + */ + public void addMetricSet(String prefix, MetricSet metricSet) { + addMetricSet(new PrefixedMetricsSet(prefix, metricSet)); + } + + /** + * Unregister a metric set; robust + * @param metricSet metric set to unregister + */ + public void unregister(MetricSet metricSet) { + for (String s : metricSet.getMetrics().keySet()) { + try { + metrics.remove(s); + } catch (IllegalArgumentException e) { + // log but continue + log.info("Exception when trying to unregister {}", s, e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java index f8646bf..864a1cf 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java @@ -19,7 +19,9 @@ package org.apache.slider.server.appmaster.management; import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.Slf4jReporter; import com.google.common.base.Preconditions; @@ -29,6 +31,9 @@ import org.apache.slider.server.services.workflow.ClosingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -79,7 +84,7 @@ public class MetricsBindingService extends CompositeService JmxReporter jmxReporter; jmxReporter = JmxReporter.forRegistry(metrics).build(); jmxReporter.start(); - addService(new ClosingService<JmxReporter>(jmxReporter)); + addService(new ClosingService<>(jmxReporter)); // Ganglia @@ -128,7 +133,7 @@ public class MetricsBindingService extends CompositeService .convertDurationsTo(TimeUnit.MILLISECONDS) .build(); reporter.start(interval, TimeUnit.MINUTES); - addService(new ClosingService<ScheduledReporter>(reporter)); + addService(new ClosingService<>(reporter)); summary.append(String.format(", SLF4J to log %s interval=%d", logName, interval)); } @@ -136,8 +141,11 @@ public class MetricsBindingService extends CompositeService log.info(reportingDetails); } + @Override public String toString() { return super.toString() + " " + reportingDetails; } + + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java index 31a82a3..fa6bfc0 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java @@ -53,4 +53,6 @@ public class MetricsConstants { */ public static final String CONTAINERS_START_FAILED = "containers.start-failed"; + public static final String PREFIX_SLIDER_ROLES = "slider.roles."; + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java new file mode 100644 index 0000000..e9ad46a --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; + +import java.util.HashMap; +import java.util.Map; + +/** + * From an existing metrics set, generate a new metrics set with the + * prefix in front of every key. + * + * The prefix is added directly: if you want a '.' between prefix and metric + * keys, include it in the prefix. + */ +public class PrefixedMetricsSet implements MetricSet { + + private final String prefix; + private final MetricSet source; + + public PrefixedMetricsSet(String prefix, MetricSet source) { + this.prefix = prefix; + this.source = source; + } + + @Override + public Map<String, Metric> getMetrics() { + Map<String, Metric> sourceMetrics = source.getMetrics(); + Map<String, Metric> metrics = new HashMap<>(sourceMetrics.size()); + for (Map.Entry<String, Metric> entry : sourceMetrics.entrySet()) { + metrics.put(prefix + "." + entry.getKey(), entry.getValue()); + } + return metrics; + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 2da5d36..49bc225 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -839,8 +839,11 @@ public class AppState { } RoleStatus roleStatus = new RoleStatus(providerRole); roleStatusMap.put(priority, roleStatus); - roles.put(providerRole.name, providerRole); + String name = providerRole.name; + roles.put(name, providerRole); rolePriorityMap.put(priority, providerRole); + // register its entries + metricsAndMonitoring.addMetricSet(MetricsConstants.PREFIX_SLIDER_ROLES + name, roleStatus); return roleStatus; } @@ -1511,7 +1514,6 @@ public class AppState { public int exitStatus = 0; public boolean unknownNode = false; - public String toString() { final StringBuilder sb = new StringBuilder("NodeCompletionResult{"); @@ -1969,7 +1971,8 @@ public class AppState { expected = role.getDesired(); } - log.info("Reviewing {} : expected {}", role, expected); + log.info("Reviewing {} : ", role); + log.debug("Expected {}, Delta: {}", expected, delta); checkFailureThreshold(role); if (expected < 0 ) { @@ -1986,29 +1989,28 @@ public class AppState { log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected); if (role.isAntiAffinePlacement()) { - // build one only if there is none outstanding, the role history knows - // enough about the cluster to ask, and there is somewhere to place - // the node - if (role.getPendingAntiAffineRequests() == 0 - && !role.isAARequestOutstanding() - && roleHistory.canPlaceAANodes()) { - // log the number outstanding - AMRMClient.ContainerRequest request = createAAContainerRequest(role); - if (request != null) { - log.info("Starting an anti-affine request sequence for {} nodes", delta); - role.incPendingAntiAffineRequests(delta - 1); - addContainerRequest(operations, request); - } else { - log.info("No location for anti-affine request"); + long pending = delta; + if (roleHistory.canPlaceAANodes()) { + // build one only if there is none outstanding, the role history knows + // enough about the cluster to ask, and there is somewhere to place + // the node + if (!role.isAARequestOutstanding()) { + // no outstanding AA; try to place things + AMRMClient.ContainerRequest request = createAAContainerRequest(role); + if (request != null) { + pending--; + log.info("Starting an anti-affine request sequence for {} nodes; pending={}", + delta, pending); + addContainerRequest(operations, request); + } else { + log.info("No location for anti-affine request"); + } } } else { - if (roleHistory.canPlaceAANodes()) { - log.info("Adding {} more anti-affine requests", delta); - } else { - log.warn("Awaiting node map before generating node requests"); - } - role.incPendingAntiAffineRequests(delta); + log.warn("Awaiting node map before generating anti-affinity requests"); } + log.info("Setting pending to {}", pending); + role.setPendingAntiAffineRequests(pending); } else { for (int i = 0; i < delta; i++) { @@ -2024,7 +2026,7 @@ public class AppState { long excess = -delta; // how many requests are outstanding? for AA roles, this includes pending - long outstandingRequests = role.getRequested(); + long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests(); if (outstandingRequests > 0) { // outstanding requests. int toCancel = (int)Math.min(outstandingRequests, excess); @@ -2084,13 +2086,11 @@ public class AppState { roleId, containersToRelease); - //crop to the excess - - List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease) + // crop to the excess + List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease) ? containersToRelease.subList(0, (int)excess) : containersToRelease; - // then build up a release operation, logging each container as released for (RoleInstance possible : finalCandidates) { log.info("Targeting for release: {}", possible); @@ -2099,9 +2099,17 @@ public class AppState { } } + } else { + // actual + requested == desired + // there's a special case here: clear all pending AA requests + if (role.getPendingAntiAffineRequests() > 0) { + log.debug("Clearing outstanding pending AA requests"); + role.setPendingAntiAffineRequests(0); + } } - // list of operations to execute + // there's now a list of operations to execute + log.debug("operations scheduled: {}; updated role: {}", operations.size(), role); return operations; } @@ -2274,9 +2282,10 @@ public class AppState { if (role.getPendingAntiAffineRequests() > 0) { // still an outstanding AA request: need to issue a new one. log.info("Asking for next container for AA role {}", roleName); - role.decPendingAntiAffineRequests(); if (!addContainerRequest(operations, createAAContainerRequest(role))) { log.info("No capacity in cluster for new requests"); + } else { + role.decPendingAntiAffineRequests(); } log.debug("Current AA role status {}", role); } else { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index 129fd4c..3a75f27 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -342,10 +342,12 @@ public final class OutstandingRequest extends RoleHostnamePair { @Override public String toString() { - int requestRoleId = ContainerPriority.extractRole(getPriority()); boolean requestHasLocation = ContainerPriority.hasLocation(getPriority()); final StringBuilder sb = new StringBuilder("OutstandingRequest{"); - sb.append(super.toString()); + sb.append("roleId=").append(roleId); + if (hostname != null) { + sb.append(", hostname='").append(hostname).append('\''); + } sb.append(", node=").append(node); sb.append(", hasLocation=").append(requestHasLocation); sb.append(", requestedTimeMillis=").append(requestedTimeMillis); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index 0fc3dc2..656f96c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -18,16 +18,21 @@ package org.apache.slider.server.appmaster.state; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.slider.api.types.ComponentInformation; import org.apache.slider.api.types.RoleStatistics; import org.apache.slider.providers.PlacementPolicy; import org.apache.slider.providers.ProviderRole; +import org.apache.slider.server.appmaster.management.BoolMetric; +import org.apache.slider.server.appmaster.management.BoolMetricPredicate; import org.apache.slider.server.appmaster.management.LongGauge; import java.io.Serializable; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; /** @@ -38,7 +43,7 @@ import java.util.Map; * requires synchronization. Where synchronized access is good is that it allows for * the whole instance to be locked, for updating multiple entries. */ -public final class RoleStatus implements Cloneable { +public final class RoleStatus implements Cloneable, MetricSet { private final String name; @@ -48,32 +53,30 @@ public final class RoleStatus implements Cloneable { private final int key; private final ProviderRole providerRole; - private final LongGauge desired = new LongGauge(); private final LongGauge actual = new LongGauge(); - private final LongGauge requested = new LongGauge(); - private final LongGauge releasing = new LongGauge(); - private final LongGauge failed = new LongGauge(); - private final LongGauge startFailed = new LongGauge(); - private final LongGauge started= new LongGauge(); private final LongGauge completed = new LongGauge(); - private final LongGauge totalRequested = new LongGauge(); - private final LongGauge preempted = new LongGauge(0); - private final LongGauge nodeFailed = new LongGauge(0); + private final LongGauge desired = new LongGauge(); + private final LongGauge failed = new LongGauge(); private final LongGauge failedRecently = new LongGauge(0); private final LongGauge limitsExceeded = new LongGauge(0); + private final LongGauge nodeFailed = new LongGauge(0); + /** Number of AA requests queued. */ + private final LongGauge pendingAntiAffineRequests = new LongGauge(0); + private final LongGauge preempted = new LongGauge(0); + private final LongGauge releasing = new LongGauge(); + private final LongGauge requested = new LongGauge(); + private final LongGauge started = new LongGauge(); + private final LongGauge startFailed = new LongGauge(); + private final LongGauge totalRequested = new LongGauge(); /** resource requirements */ private Resource resourceRequirements; - /** - * Number of AA requests queued. These should be reduced first on a - * flex down. - */ - private final LongGauge pendingAntiAffineRequests = new LongGauge(0); /** any pending AA request */ private volatile OutstandingRequest outstandingAArequest = null; + private String failureMessage = ""; public RoleStatus(ProviderRole providerRole) { @@ -81,7 +84,37 @@ public final class RoleStatus implements Cloneable { this.name = providerRole.name; this.key = providerRole.id; } - + + @Override + public Map<String, Metric> getMetrics() { + Map<String, Metric> metrics = new HashMap<>(15); + metrics.put("actual", actual); + metrics.put("completed", completed ); + metrics.put("desired", desired); + metrics.put("failed", failed); + metrics.put("limitsExceeded", limitsExceeded); + metrics.put("nodeFailed", nodeFailed); + metrics.put("preempted", preempted); + metrics.put("pendingAntiAffineRequests", pendingAntiAffineRequests); + metrics.put("releasing", releasing); + metrics.put("requested", requested); + metrics.put("preempted", preempted); + metrics.put("releasing", releasing ); + metrics.put("requested", requested); + metrics.put("started", started); + metrics.put("startFailed", startFailed); + metrics.put("totalRequested", totalRequested); + + metrics.put("outstandingAArequest", + new BoolMetricPredicate(new BoolMetricPredicate.Eval() { + @Override + public boolean eval() { + return isAARequestOutstanding(); + } + })); + return metrics; + } + public String getName() { return name; } @@ -157,11 +190,11 @@ public final class RoleStatus implements Cloneable { } /** - * Get the request count. For AA roles, this includes pending ones. + * Get the request count. * @return a count of requested containers */ public long getRequested() { - return requested.get() + pendingAntiAffineRequests.get(); + return requested.get(); } public long incRequested() { @@ -222,6 +255,14 @@ public final class RoleStatus implements Cloneable { } /** + * expose the predicate {@link #isAARequestOutstanding()} as an integer, + * which is very convenient in tests + * @return 1 if there is an outstanding request; 0 if not + */ + public int getOutstandingAARequestCount() { + return isAARequestOutstanding()? 1: 0; + } + /** * Note that a role failed, text will * be used in any diagnostics if an exception * is later raised. @@ -350,7 +391,6 @@ public final class RoleStatus implements Cloneable { */ public long getDelta() { long inuse = getActualAndRequested(); - //don't know how to view these. Are they in-use or not? long delta = desired.get() - inuse; if (delta < 0) { //if we are releasing, remove the number that are already released. @@ -366,7 +406,7 @@ public final class RoleStatus implements Cloneable { * @return the size of the application when outstanding requests are included. */ public long getActualAndRequested() { - return actual.get() + requested.get() + pendingAntiAffineRequests.get(); + return actual.get() + requested.get(); } @Override @@ -499,7 +539,7 @@ public final class RoleStatus implements Cloneable { public synchronized RoleStatistics getStatistics() { RoleStatistics stats = new RoleStatistics(); - stats.activeAA = isAARequestOutstanding() ? 1: 0; + stats.activeAA = getOutstandingAARequestCount(); stats.actual = actual.get(); stats.desired = desired.get(); stats.failed = failed.get(); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy new file mode 100644 index 0000000..7728748 --- /dev/null +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.model.appstate + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import org.apache.hadoop.yarn.api.records.Container +import org.apache.hadoop.yarn.api.records.ContainerId +import org.apache.hadoop.yarn.api.records.NodeState +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.slider.core.main.LauncherExitCodes +import org.apache.slider.server.appmaster.model.mock.MockNodeReport +import org.apache.slider.server.appmaster.model.mock.MockRoles +import org.apache.slider.server.appmaster.model.mock.MockYarnEngine +import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.state.AppState +import org.apache.slider.server.appmaster.state.ContainerAssignment +import org.apache.slider.server.appmaster.state.NodeInstance +import org.apache.slider.server.appmaster.state.NodeMap +import org.apache.slider.server.appmaster.state.RoleInstance +import org.junit.Test + +/** + * Test Anti-affine placement with a cluster of size 1 + */ +@CompileStatic +@Slf4j +class TestMockAppStateAAOvercapacity extends BaseMockAppStateAATest + implements MockRoles { + + private int NODES = 1 + + @Override + MockYarnEngine createYarnEngine() { + new MockYarnEngine(NODES, 1) + } + + void assertAllContainersAA() { + assertAllContainersAA(aaRole.key) + } + + /** + * + * @throws Throwable + */ + @Test + public void testOvercapacityRecovery() throws Throwable { + + describe("Ask for 1 more than the no of available nodes;" + + "verify the state. kill the allocated container and review") + //more than expected + long desired = 3 + aaRole.desired = desired + assert appState.roleHistory.canPlaceAANodes() + + //first request + List<AbstractRMOperation > operations = appState.reviewRequestAndReleaseNodes() + assert aaRole.AARequestOutstanding + assert desired - 1 == aaRole.pendingAntiAffineRequests + List<AbstractRMOperation > operationsOut = [] + // allocate and re-submit + def instances = submitOperations(operations, [], operationsOut) + assert 1 == instances.size() + assertAllContainersAA() + + // expect an outstanding AA request to be unsatisfied + assert aaRole.actual < aaRole.desired + assert !aaRole.requested + assert !aaRole.AARequestOutstanding + assert desired - 1 == aaRole.pendingAntiAffineRequests + List<Container> allocatedContainers = engine.execute(operations, []) + assert 0 == allocatedContainers.size() + + // now lets trigger a failure + def nodemap = cloneNodemap() + assert nodemap.size() == 1 + + def instance = instances[0] + def cid = instance.containerId + + AppState.NodeCompletionResult result = appState.onCompletedNode(containerStatus(cid, + LauncherExitCodes.EXIT_TASK_LAUNCH_FAILURE)) + assert result.containerFailed + + assert aaRole.failed == 1 + assert aaRole.actual == 0 + def availablePlacements = appState.getRoleHistory().findNodeForNewAAInstance(aaRole) + assert availablePlacements.size() == 1 + describe "expecting a successful review with available placements of $availablePlacements" + operations = appState.reviewRequestAndReleaseNodes() + assert operations.size() == 1 + } + + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index 749e4fc..3461e23 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -113,7 +113,9 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateAATest aaRole.desired = 2 List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() getSingleRequest(ops) + assert aaRole.requested == 1 assert aaRole.pendingAntiAffineRequests == 1 + assert aaRole.actualAndRequested + aaRole.pendingAntiAffineRequests == aaRole.desired // now trigger that flex up aaRole.desired = 3 @@ -121,6 +123,13 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateAATest // expect: no new reqests, pending count ++ List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() assert ops2.empty + assert aaRole.actual + aaRole.pendingAntiAffineRequests + aaRole.outstandingAARequestCount == + aaRole.desired + + // 1 outstanding + assert aaRole.actual == 0 + assert aaRole.AARequestOutstanding + // and one AA assert aaRole.pendingAntiAffineRequests == 2 assertAllContainersAA() @@ -141,7 +150,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateAATest } @Test - public void testAllocateFlexDown() throws Throwable { + public void testAllocateFlexDownDecrementsPending() throws Throwable { // want multiple instances, so there will be iterations aaRole.desired = 2 List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy index c8a82bd..ca42546 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy @@ -28,12 +28,10 @@ import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.api.ResourceKeys -import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockContainer import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.model.mock.MockNodeId -import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.* import org.junit.Test @@ -402,7 +400,7 @@ class TestRoleHistoryContainerEvents extends BaseMockAppStateTest { int startSize = nodemap.size() // now send a list of updated (failed) nodes event - List<NodeReport> nodesUpdated = new ArrayList<NodeReport>(); + List<NodeReport> nodesUpdated = new ArrayList<>(); NodeReport nodeReport = NodeReport.newInstance( NodeId.newInstance(hostname, 0), NodeState.LOST, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index da1bcb9..a53e0be 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -401,20 +401,6 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles } /** - * Scan through all containers and assert that the assignment is AA - * @param index role index - */ - void assertAllContainersAAOld(String index) { - def nodemap = stateAccess.nodeInformationSnapshot - nodemap.each { name, info -> - def nodeEntry = info.entries[index] - assert nodeEntry == null || - (nodeEntry.live - nodeEntry.releasing + nodeEntry.starting) <= 1, - "too many instances on node $name" - } - } - - /** * Get the node information as a large JSON String * @return */
