Repository: incubator-apex-core Updated Branches: refs/heads/master 854966439 -> 0f1e2bb03
APEXCORE-393 #resolve Adding Dag context attributes with increased default value for blacklisting of failed nodes Added resetting of failure count for nodes after blacklist removal interval. Cleaned up code to remove concurrent Map for maintaining failed counts. Reduced checkstyle violation count in engine pom.xml. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0f1e2bb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0f1e2bb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0f1e2bb0 Branch: refs/heads/master Commit: 0f1e2bb038fbf66955990e9ff8772bef4ac2d874 Parents: 8549664 Author: ishark <[email protected]> Authored: Wed Mar 16 17:39:19 2016 -0700 Committer: ishark <[email protected]> Committed: Tue Mar 22 18:28:14 2016 -0700 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 16 ++++ engine/pom.xml | 2 +- .../stram/StreamingAppMasterService.java | 90 +++++++++++++------- 3 files changed, 76 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0f1e2bb0/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index d34d682..ee90100 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -495,6 +495,22 @@ public interface Context * Only supports string codecs that have a constructor with no arguments */ Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>> STRING_CODECS = new Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>(new Map2String<Class<?>, Class<? extends StringCodec<?>>>(",", "=", new Class2String<Object>(), new Class2String<StringCodec<?>>())); + + /** + * The number of consecutive container failures that should lead to + * blacklisting of nodes by application master + * Blacklisting for nodes is disabled for the default value + */ + Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE); + + /** + * The amount of time to wait before removing failed nodes from blacklist + */ + Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000)); + + /** + * The number of times consecutive container failure + */ @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0f1e2bb0/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index f13c2f2..f968686 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -149,7 +149,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>3173</maxAllowedViolations> + <maxAllowedViolations>3161</maxAllowedViolations> <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0f1e2bb0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 31a7fc8..fbc0b36 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -31,7 +31,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -82,13 +82,13 @@ import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.datatorrent.api.Attribute; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StringCodec; -import com.datatorrent.common.util.Pair; import com.datatorrent.stram.StreamingContainerManager.ContainerResource; import com.datatorrent.stram.api.AppDataSource; import com.datatorrent.stram.api.BaseContext; @@ -123,8 +123,6 @@ public class StreamingAppMasterService extends CompositeService private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2; private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000; private static final int NUMBER_MISSED_HEARTBEATS = 30; - private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3; - private static final long BLACKLIST_REMOVAL_TIME = 60 * 60 * 1000; private AMRMClient<ContainerRequest> amRmClient; private NMClientAsync nmClient; private LogicalPlan dag; @@ -140,8 +138,10 @@ public class StreamingAppMasterService extends CompositeService private final AtomicInteger numCompletedContainers = new AtomicInteger(); // Containers that the RM has allocated to us private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap(); - private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap(); - private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>(); + // Set of nodes marked blacklisted due to consecutive container failures on the nodes + private final Set<String> failedBlackListedNodes = Sets.newHashSet(); + // Maintains max consecutive failures stats for nodes for blacklisting failing nodes + private final Map<String, NodeFailureStats> failedContainerNodesMap = Maps.newHashMap(); // Count of failed containers private final AtomicInteger numFailedContainers = new AtomicInteger(); private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>(); @@ -161,6 +161,19 @@ public class StreamingAppMasterService extends CompositeService this.appAttemptID = appAttemptID; } + private class NodeFailureStats + { + long lastFailureTimeStamp; + int failureCount; + long blackListAdditionTime; + + public NodeFailureStats(long lastFailureTimeStamp, int failureCount) + { + this.lastFailureTimeStamp = lastFailureTimeStamp; + this.failureCount = failureCount; + } + } + /** * Overrides getters to pull live info. */ @@ -671,9 +684,9 @@ public class StreamingAppMasterService extends CompositeService int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0); LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", maxMem, minMem, maxVcores, minVcores); - int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", MAX_CONTAINER_FAILURES_PER_NODE); - long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", BLACKLIST_REMOVAL_TIME); - + long blacklistRemovalTime = dag.getValue(DAGContext.BLACKLISTED_NODE_REMOVAL_TIME_MILLIS); + int maxConsecutiveContainerFailures = dag.getValue(DAGContext.MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST); + LOG.info("Blacklist removal time in millis = {}, max consecutive node failure count = {}", blacklistRemovalTime, maxConsecutiveContainerFailures); // for locality relaxation fall back Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources = Maps.newHashMap(); @@ -805,18 +818,17 @@ public class StreamingAppMasterService extends CompositeService /* Remove nodes from blacklist after timeout */ long currentTime = System.currentTimeMillis(); List<String> blacklistRemovals = new ArrayList<String>(); - for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) { - Pair<Long, List<String>> entry = it.next(); - Long timeDiff = currentTime - entry.getFirst(); - if (timeDiff > blacklistRemovalTime) { - blacklistRemovals.addAll(entry.getSecond()); - it.remove(); - } else { - break; + for (String hostname : failedBlackListedNodes) { + Long timeDiff = currentTime - failedContainerNodesMap.get(hostname).blackListAdditionTime; + if (timeDiff >= blacklistRemovalTime) { + blacklistRemovals.add(hostname); + failedContainerNodesMap.remove(hostname); } } if (!blacklistRemovals.isEmpty()) { amRmClient.updateBlacklist(null, blacklistRemovals); + LOG.info("Removing nodes {} from blacklist: time elapsed since last blacklisting due to failure is greater than specified timeout", blacklistRemovals.toString()); + failedBlackListedNodes.removeAll(blacklistRemovals); } numTotalContainers += containerRequests.size(); @@ -922,18 +934,30 @@ public class StreamingAppMasterService extends CompositeService if (0 != exitStatus) { if (allocatedContainer != null) { numFailedContainers.incrementAndGet(); - if (exitStatus != 1) { + if (exitStatus != 1 && maxConsecutiveContainerFailures != Integer.MAX_VALUE) { // If container failure due to framework String hostname = allocatedContainer.container.getNodeId().getHost(); - int failedTimes = 1; - AtomicInteger failed = failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1)); - if (failed != null) { - failedTimes = failed.incrementAndGet(); - } - if (failedTimes >= maxConsecutiveContainerFailures) { - // Blacklist the node - LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", hostname, failedTimes); - blacklistAdditions.add(hostname); + if (!failedBlackListedNodes.contains(hostname)) { + // Blacklist the node if not already blacklisted + if (failedContainerNodesMap.containsKey(hostname)) { + NodeFailureStats stats = failedContainerNodesMap.get(hostname); + long timeStamp = System.currentTimeMillis(); + if (timeStamp - stats.lastFailureTimeStamp >= blacklistRemovalTime) { + // Reset failure count if last failure was before Blacklist removal time + stats.failureCount = 1; + stats.lastFailureTimeStamp = timeStamp; + } else { + stats.lastFailureTimeStamp = timeStamp; + stats.failureCount++; + if (stats.failureCount >= maxConsecutiveContainerFailures) { + LOG.info("Node {} failed {} times consecutively within {} minutes, marking the node blacklisted", hostname, stats.failureCount, blacklistRemovalTime / (60 * 1000)); + blacklistAdditions.add(hostname); + failedBlackListedNodes.add(hostname); + } + } + } else { + failedContainerNodesMap.put(hostname, new NodeFailureStats(System.currentTimeMillis(), 1)); + } } } } @@ -957,9 +981,9 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); // Reset counter for node failure, if exists String hostname = allocatedContainer.container.getNodeId().getHost(); - AtomicInteger failedTimes = failedContainersMap.get(hostname); - if(failedTimes != null) { - failedTimes.set(0); + NodeFailureStats stats = failedContainerNodesMap.get(hostname); + if (stats != null) { + stats.failureCount = 0; } } @@ -974,7 +998,11 @@ public class StreamingAppMasterService extends CompositeService if (!blacklistAdditions.isEmpty()) { amRmClient.updateBlacklist(blacklistAdditions, null); - blacklistedNodesQueueWithTimeStamp.add(new Pair<Long, List<String>>(System.currentTimeMillis(), blacklistAdditions)); + long timeStamp = System.currentTimeMillis(); + for (String hostname : blacklistAdditions) { + NodeFailureStats stats = failedContainerNodesMap.get(hostname); + stats.blackListAdditionTime = timeStamp; + } } if (dnmgr.forcedShutdown) { LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
