TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ed7d1ab Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ed7d1ab Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ed7d1ab Branch: refs/heads/TEZ-2980 Commit: 4ed7d1abafa9aab7636c1febcb1a63ea63fde9c8 Parents: c34c54f Author: Jason Lowe <[email protected]> Authored: Fri Dec 18 21:01:36 2015 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Dec 18 21:01:36 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 13 ++++++ .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 16 ++++--- .../tez/dag/app/rm/node/AMNodeTracker.java | 10 ++++- .../dag/app/rm/node/PerSourceNodeTracker.java | 8 +++- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 47 ++++++++++++++++++++ 6 files changed, 86 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0ad2203..a3b0fa6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators. TEZ-2949. Allow duplicate dag names within session for Tez. TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job. + TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex @@ -90,6 +91,7 @@ ALL CHANGES: TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables TEZ-2887. Tez build failure due to missing dependency in pom files. TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper. + TEZ-2972. Avoid task rescheduling when a node turns unhealthy Release 0.8.1-alpha: 2015-10-12 http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index fabc256..b707857 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -569,6 +569,19 @@ public class TezConfiguration extends Configuration { + "node-blacklisting.ignore-threshold-node-percent"; public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33; + /** + * Boolean value. Enable task rescheduling for node updates. + * When enabled the task scheduler will reschedule task attempts that + * are associated with an unhealthy node to avoid potential data transfer + * errors from downstream tasks. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS = + TEZ_AM_PREFIX + "node-unhealthy-reschedule-tasks"; + public static final boolean + TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT = false; + /** Int value. Number of threads to handle client RPC requests. Expert level setting.*/ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java index 18d5978..bcc38c6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java @@ -59,6 +59,7 @@ public class AMNodeImpl implements AMNode { private final int maxTaskFailuresPerNode; private boolean blacklistingEnabled; private boolean ignoreBlacklisting = false; + private boolean nodeUpdatesRescheduleEnabled; private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet(); @SuppressWarnings("rawtypes") @@ -175,7 +176,7 @@ public class AMNodeImpl implements AMNode { @SuppressWarnings("rawtypes") public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode, EventHandler eventHandler, boolean blacklistingEnabled, - AppContext appContext) { + boolean rescheduleOnUnhealthyNode, AppContext appContext) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -184,6 +185,7 @@ public class AMNodeImpl implements AMNode { this.appContext = appContext; this.eventHandler = eventHandler; this.blacklistingEnabled = blacklistingEnabled; + this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode; this.maxTaskFailuresPerNode = maxTaskFailuresPerNode; this.stateMachine = stateMachineFactory.make(this); // TODO Handle the case where a node is created due to the RM reporting it's @@ -323,12 +325,14 @@ public class AMNodeImpl implements AMNode { SingleArcTransition<AMNodeImpl, AMNodeEvent> { @Override public void transition(AMNodeImpl node, AMNodeEvent nEvent) { - for (ContainerId c : node.containers) { - node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed")); + if (node.nodeUpdatesRescheduleEnabled) { + for (ContainerId c : node.containers) { + node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed")); + } + // Resetting counters. + node.numFailedTAs = 0; + node.numSuccessfulTAs = 0; } - // Resetting counters. - node.numFailedTAs = 0; - node.numSuccessfulTAs = 0; } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index 1aa8472..fdc8a4c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -50,6 +50,7 @@ public class AMNodeTracker extends AbstractService implements private int maxTaskFailuresPerNode; private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; + private boolean nodeUpdatesRescheduleEnabled; @SuppressWarnings("rawtypes") public AMNodeTracker(EventHandler eventHandler, AppContext appContext) { @@ -70,10 +71,14 @@ public class AMNodeTracker extends AbstractService implements this.blacklistDisablePercent = conf.getInt( TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT); + this.nodeUpdatesRescheduleEnabled = conf.getBoolean( + TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, + TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT); LOG.info("blacklistDisablePercent is " + blacklistDisablePercent + ", blacklistingEnabled: " + nodeBlacklistingEnabled + - ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode); + ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode + + ", nodeUpdatesRescheduleEnabled: " + nodeUpdatesRescheduleEnabled); if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { throw new TezUncheckedException("Invalid blacklistDisablePercent: " @@ -143,7 +148,8 @@ public class AMNodeTracker extends AbstractService implements if (nodeTracker == null) { nodeTracker = new PerSourceNodeTracker(schedulerId, eventHandler, appContext, maxTaskFailuresPerNode, - nodeBlacklistingEnabled, blacklistDisablePercent); + nodeBlacklistingEnabled, blacklistDisablePercent, + nodeUpdatesRescheduleEnabled); PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(schedulerId, nodeTracker); nodeTracker = old != null ? old : nodeTracker; } http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java index b1c81af..72c3230 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java @@ -42,6 +42,7 @@ public class PerSourceNodeTracker { private final int maxTaskFailuresPerNode; private final boolean nodeBlacklistingEnabled; private final int blacklistDisablePercent; + private final boolean nodeUpdatesRescheduleEnabled; private int numClusterNodes; float currentIgnoreBlacklistingCountThreshold = 0; @@ -50,7 +51,8 @@ public class PerSourceNodeTracker { @SuppressWarnings("rawtypes") public PerSourceNodeTracker(int sourceId, EventHandler eventHandler, AppContext appContext, int maxTaskFailuresPerNode, boolean nodeBlacklistingEnabled, - int blacklistDisablePercent) { + int blacklistDisablePercent, + boolean nodeUpdatesRescheduleEnabled) { this.sourceId = sourceId; this.nodeMap = new ConcurrentHashMap<>(); this.blacklistMap = new ConcurrentHashMap<>(); @@ -60,13 +62,15 @@ public class PerSourceNodeTracker { this.maxTaskFailuresPerNode = maxTaskFailuresPerNode; this.nodeBlacklistingEnabled = nodeBlacklistingEnabled; this.blacklistDisablePercent = blacklistDisablePercent; + this.nodeUpdatesRescheduleEnabled = nodeUpdatesRescheduleEnabled; } public void nodeSeen(NodeId nodeId) { if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, sourceId, maxTaskFailuresPerNode, - eventHandler, nodeBlacklistingEnabled, appContext)) == null) { + eventHandler, nodeBlacklistingEnabled, nodeUpdatesRescheduleEnabled, + appContext)) == null) { LOG.info("Adding new node {} to nodeTracker {}", nodeId, sourceId); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java index 143fcbf..25d1784 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java @@ -326,6 +326,53 @@ public class TestAMNodeTracker { } } + @Test(timeout=10000) + public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception { + _testNodeUnhealthyRescheduleTasks(true); + } + + @Test(timeout=10000) + public void testNodeUnhealthyRescheduleTasksDisabled() throws Exception { + _testNodeUnhealthyRescheduleTasks(false); + } + + private void _testNodeUnhealthyRescheduleTasks(boolean rescheduleTasks) { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, + rescheduleTasks); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + amNodeTracker.init(conf); + amNodeTracker.start(); + + // add a node + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0)); + NodeId nodeId = NodeId.newInstance("host1", 1234); + amNodeTracker.nodeSeen(nodeId, 0); + AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0); + + // simulate task starting on node + ContainerId cid = mock(ContainerId.class); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cid)); + + // mark node unhealthy + NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY); + amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0)); + assertEquals(AMNodeState.UNHEALTHY, node.getState()); + + // check for task rescheduling events + if (rescheduleTasks) { + assertEquals(1, handler.events.size()); + assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType()); + } else { + assertEquals(0, handler.events.size()); + } + + amNodeTracker.stop(); + } + private void _testSingleNodeNotBlacklisted(AMNodeTracker amNodeTracker, TestEventHandler handler, int schedulerId) { amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, schedulerId));
