Repository: tez Updated Branches: refs/heads/branch-0.7 9880c414c -> d80e30d3a
TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d80e30d3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d80e30d3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d80e30d3 Branch: refs/heads/branch-0.7 Commit: d80e30d3af44471c5da98e5e9a1dd08929dc3931 Parents: 9880c41 Author: Jason Lowe <[email protected]> Authored: Tue Jan 5 19:31:35 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Tue Jan 5 19:31:35 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 12 +++++ tez-dag/findbugs-exclude.xml | 1 + .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 16 ++++--- .../tez/dag/app/rm/node/AMNodeTracker.java | 14 ++++-- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 47 ++++++++++++++++++++ 6 files changed, 82 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4f8897e..c2364a7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,8 +4,10 @@ Apache Tez Change Log INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings TEZ-2949. Allow duplicate dag names within session for Tez. + TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-2972. Avoid task rescheduling when a node turns unhealthy TEZ-3017. HistoryACLManager does not have a close method for cleanup TEZ-2914. Ability to limit vertex concurrency TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 8997327..d4d5759 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -541,6 +541,18 @@ public class TezConfiguration extends Configuration { + "node-blacklisting.ignore-threshold-node-percent"; public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33; + /** + * Boolean value. Enable task rescheduling for node updates. + * When enabled the task scheduler will reschedule task attempts that + * are associated with an unhealthy node to avoid potential data transfer + * errors from downstream tasks. + */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS = + TEZ_AM_PREFIX + "node-unhealthy-reschedule-tasks"; + public static final boolean + TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT = false; + /** Int value. Number of threads to handle client RPC requests. Expert level setting.*/ @ConfigurationScope(Scope.AM) public static final String TEZ_AM_CLIENT_THREAD_COUNT = http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index 4c01edc..2842a50 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -18,6 +18,7 @@ <Or> <Field name="blacklistDisablePercent" /> <Field name="maxTaskFailuresPerNode" /> + <Field name="nodeUpdatesRescheduleEnabled" /> </Or> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java index b93cab3..f4d89e4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java @@ -58,6 +58,7 @@ public class AMNodeImpl implements AMNode { private final int maxTaskFailuresPerNode; private boolean blacklistingEnabled; private boolean ignoreBlacklisting = false; + private boolean nodeUpdatesRescheduleEnabled; private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet(); @SuppressWarnings("rawtypes") @@ -174,7 +175,7 @@ public class AMNodeImpl implements AMNode { @SuppressWarnings("rawtypes") public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode, EventHandler eventHandler, boolean blacklistingEnabled, - AppContext appContext) { + boolean rescheduleOnUnhealthyNode, AppContext appContext) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -182,6 +183,7 @@ public class AMNodeImpl implements AMNode { this.appContext = appContext; this.eventHandler = eventHandler; this.blacklistingEnabled = blacklistingEnabled; + this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode; this.maxTaskFailuresPerNode = maxTaskFailuresPerNode; this.stateMachine = stateMachineFactory.make(this); // TODO Handle the case where a node is created due to the RM reporting it's @@ -321,12 +323,14 @@ public class AMNodeImpl implements AMNode { SingleArcTransition<AMNodeImpl, AMNodeEvent> { @Override public void transition(AMNodeImpl node, AMNodeEvent nEvent) { - for (ContainerId c : node.containers) { - node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed")); + if (node.nodeUpdatesRescheduleEnabled) { + for (ContainerId c : node.containers) { + node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed")); + } + // Resetting counters. + node.numFailedTAs = 0; + node.numSuccessfulTAs = 0; } - // Resetting counters. - node.numFailedTAs = 0; - node.numSuccessfulTAs = 0; } } http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index a067cee..38c154b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -53,7 +53,8 @@ public class AMNodeTracker extends AbstractService implements private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; float currentIgnoreBlacklistingCountThreshold = 0; - + private boolean nodeUpdatesRescheduleEnabled; + @SuppressWarnings("rawtypes") public AMNodeTracker(EventHandler eventHandler, AppContext appContext) { super("AMNodeMap"); @@ -74,10 +75,14 @@ public class AMNodeTracker extends AbstractService implements this.blacklistDisablePercent = conf.getInt( TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT); + this.nodeUpdatesRescheduleEnabled = conf.getBoolean( + TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, + TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT); LOG.info("blacklistDisablePercent is " + blacklistDisablePercent + - ", blacklistingEnabled: " + nodeBlacklistingEnabled + - ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode); + ", blacklistingEnabled: " + nodeBlacklistingEnabled + + ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode + + ", nodeUpdatesRescheduleEnabled: " + nodeUpdatesRescheduleEnabled); if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { throw new TezUncheckedException("Invalid blacklistDisablePercent: " @@ -88,7 +93,8 @@ public class AMNodeTracker extends AbstractService implements public void nodeSeen(NodeId nodeId) { if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode, - eventHandler, nodeBlacklistingEnabled, appContext)) == null) { + eventHandler, nodeBlacklistingEnabled, nodeUpdatesRescheduleEnabled, + appContext)) == null) { LOG.info("Adding new node: " + nodeId); } } http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java index d907ea0..0072f6a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java @@ -321,6 +321,53 @@ public class TestAMNodeTracker { amNodeTracker.stop(); } + @Test(timeout=10000) + public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception { + _testNodeUnhealthyRescheduleTasks(true); + } + + @Test(timeout=10000) + public void testNodeUnhealthyRescheduleTasksDisabled() throws Exception { + _testNodeUnhealthyRescheduleTasks(false); + } + + private void _testNodeUnhealthyRescheduleTasks(boolean rescheduleTasks) { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, + rescheduleTasks); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + amNodeTracker.init(conf); + amNodeTracker.start(); + + // add a node + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1)); + NodeId nodeId = NodeId.newInstance("host1", 1234); + amNodeTracker.nodeSeen(nodeId); + AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId); + + // simulate task starting on node + ContainerId cid = mock(ContainerId.class); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cid)); + + // mark node unhealthy + NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY); + amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport)); + assertEquals(AMNodeState.UNHEALTHY, node.getState()); + + // check for task rescheduling events + if (rescheduleTasks) { + assertEquals(1, handler.events.size()); + assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType()); + } else { + assertEquals(0, handler.events.size()); + } + + amNodeTracker.stop(); + } + private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) { NodeReport nodeReport = mock(NodeReport.class); doReturn(nodeId).when(nodeReport).getNodeId();
