Repository: incubator-apex-core Updated Branches: refs/heads/master 832901c26 -> 712138ac4
APEX-92 #Comment #resolve Fix for adding failed nodes to blacklist, after failure is observed for the same node more than MAX number of tries Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d8e1e74d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d8e1e74d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d8e1e74d Branch: refs/heads/master Commit: d8e1e74da00619a3c0e021ecd167de57fcc0e262 Parents: 90bda5e Author: ishark <[email protected]> Authored: Wed Sep 23 16:17:04 2015 -0700 Committer: ishark <[email protected]> Committed: Fri Sep 25 16:09:54 2015 -0700 ---------------------------------------------------------------------- engine/pom.xml | 2 +- .../stram/StreamingAppMasterService.java | 83 ++++++++++++++------ 2 files changed, 62 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8e1e74d/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index 32f2001..15e0565 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -145,7 +145,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>2248</maxAllowedViolations> + <maxAllowedViolations>2238</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8e1e74d/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index bfeedbd..5d84e10 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -32,9 +32,6 @@ import javax.xml.bind.annotation.XmlElement; import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.hadoop.conf.Configuration; @@ -63,13 +60,15 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.api.Attribute; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StringCodec; - +import com.datatorrent.common.util.Pair; import com.datatorrent.stram.StreamingContainerManager.ContainerResource; import com.datatorrent.stram.api.AppDataSource; import com.datatorrent.stram.api.BaseContext; @@ -103,11 +102,13 @@ public class StreamingAppMasterService extends CompositeService private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2; private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000; private static final int NUMBER_MISSED_HEARTBEATS = 30; + private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3; + private static final long BLACKLIST_REMOVAL_TIME = 60 * 60 * 1000; private AMRMClient<ContainerRequest> amRmClient; private NMClientAsync nmClient; private LogicalPlan dag; // Application Attempt Id ( combination of attemptId and fail count ) - final private ApplicationAttemptId appAttemptID; + private final ApplicationAttemptId appAttemptID; // Hostname of the container private final String appMasterHostname = ""; // Tracking url to which app master publishes info for clients to monitor @@ -118,6 +119,8 @@ public class StreamingAppMasterService extends CompositeService private final AtomicInteger numCompletedContainers = new AtomicInteger(); // Containers that the RM has allocated to us private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap(); + private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap(); + private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>(); // Count of failed containers private final AtomicInteger numFailedContainers = new AtomicInteger(); private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>(); @@ -279,8 +282,7 @@ public class StreamingAppMasterService extends CompositeService if (c.getExternalId() == null || c.getState() == PTContainer.State.KILLED) { if (c.getRequiredVCores() == 0) { result++; - } - else { + } else { result += c.getRequiredVCores(); } } @@ -475,15 +477,12 @@ public class StreamingAppMasterService extends CompositeService LOG.info("System CWD content: " + line); } LOG.info("Dumping files in local dir: end"); - } - finally { + } finally { buf.close(); } - } - catch (IOException e) { + } catch (IOException e) { LOG.debug("Exception", e); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { LOG.info("Interrupted", e); } @@ -492,11 +491,9 @@ public class StreamingAppMasterService extends CompositeService try { // find a better way of logging this using the logger. Configuration.dumpConfiguration(getConfig(), new PrintWriter(System.out)); - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error dumping configuration.", e); } - } @Override @@ -507,8 +504,7 @@ public class StreamingAppMasterService extends CompositeService FileInputStream fis = new FileInputStream("./" + LogicalPlan.SER_FILE_NAME); try { this.dag = LogicalPlan.read(fis); - } - finally { + } finally { fis.close(); } // "debug" simply dumps all data using LOG.info @@ -651,6 +647,9 @@ public class StreamingAppMasterService extends CompositeService int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0); LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", maxMem, minMem, maxVcores, minVcores); + int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", MAX_CONTAINER_FAILURES_PER_NODE); + long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", BLACKLIST_REMOVAL_TIME); + // for locality relaxation fall back Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources = Maps.newHashMap(); @@ -692,11 +691,9 @@ public class StreamingAppMasterService extends CompositeService return; } resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException("Failed to retrieve cluster nodes report.", e); - } - finally { + } finally { clientRMService.stop(); } @@ -781,6 +778,23 @@ public class StreamingAppMasterService extends CompositeService } } + /* Remove nodes from blacklist after timeout */ + long currentTime = System.currentTimeMillis(); + List<String> blacklistRemovals = new ArrayList<String>(); + for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) { + Pair<Long, List<String>> entry = it.next(); + Long timeDiff = currentTime - entry.getFirst(); + if (timeDiff > blacklistRemovalTime) { + blacklistRemovals.addAll(entry.getSecond()); + it.remove(); + } else { + break; + } + } + if (!blacklistRemovals.isEmpty()) { + amRmClient.updateBlacklist(null, blacklistRemovals); + } + numTotalContainers += containerRequests.size(); numRequestedContainers += containerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); @@ -868,6 +882,7 @@ public class StreamingAppMasterService extends CompositeService // Check the completed containers List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses(); // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size()); + List<String> blacklistAdditions = new ArrayList<String>(); for (ContainerStatus containerStatus : completedContainers) { LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics()); @@ -883,6 +898,20 @@ public class StreamingAppMasterService extends CompositeService if (0 != exitStatus) { if (allocatedContainer != null) { numFailedContainers.incrementAndGet(); + if (exitStatus != 1) { + // If container failure due to framework + String hostname = allocatedContainer.container.getNodeId().getHost(); + int failedTimes = 1; + AtomicInteger failed = failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1)); + if (failed != null) { + failedTimes = failed.incrementAndGet(); + } + if (failedTimes >= maxConsecutiveContainerFailures) { + // Blacklist the node + LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", hostname, failedTimes); + blacklistAdditions.add(hostname); + } + } } // if (exitStatus == 1) { // // non-recoverable StreamingContainer failure @@ -902,6 +931,12 @@ public class StreamingAppMasterService extends CompositeService // container completed successfully numCompletedContainers.incrementAndGet(); LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); + // Reset counter for node failure, if exists + String hostname = allocatedContainer.container.getNodeId().getHost(); + AtomicInteger failedTimes = failedContainersMap.get(hostname); + if(failedTimes != null) { + failedTimes.set(0); + } } String containerIdStr = containerStatus.getContainerId().toString(); @@ -913,6 +948,10 @@ public class StreamingAppMasterService extends CompositeService dnmgr.recordEventAsync(ev); } + if (!blacklistAdditions.isEmpty()) { + amRmClient.updateBlacklist(blacklistAdditions, null); + blacklistedNodesQueueWithTimeStamp.add(new Pair<Long, List<String>>(System.currentTimeMillis(), blacklistAdditions)); + } if (dnmgr.forcedShutdown) { LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); finalStatus = FinalApplicationStatus.FAILED;
