Repository: tez Updated Branches: refs/heads/master 9fd0578cc -> 7e895f54b
TEZ-3828. Allow relaxing locality when retried task's priority is kept same (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7e895f54 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7e895f54 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7e895f54 Branch: refs/heads/master Commit: 7e895f54bb3f4e16d7d76970b602ef3c59271bd8 Parents: 9fd0578 Author: Zhiyuan Yang <[email protected]> Authored: Tue Sep 12 13:39:55 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Tue Sep 12 13:39:55 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/dag/api/TezConfiguration.java | 12 +++++++++++- .../main/java/org/apache/tez/dag/app/dag/Vertex.java | 1 + .../apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 +- .../org/apache/tez/dag/app/dag/impl/VertexImpl.java | 9 +++++++++ .../apache/tez/dag/app/dag/impl/TestTaskAttempt.java | 9 +++++++++ 5 files changed, 31 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7e895f54/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 5df5259..efe6d6c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -593,7 +593,7 @@ public class TezConfiguration extends Configuration { /** * Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous - * failures gets special treatment - higher priority, dropped location hints. + * failures gets higher priority */ @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty(type="boolean") @@ -602,6 +602,16 @@ public class TezConfiguration extends Configuration { public static final boolean TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT=true; /** + * Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous + * failure get relaxed locality + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY = + TEZ_AM_PREFIX + "task.reschedule.relaxed.locality"; + public static final boolean TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT=true; + + /** * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes * will not be used to execute tasks. */ http://git-wip-us.apache.org/repos/asf/tez/blob/7e895f54/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 0a6e9c5..4d0a4bf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -206,5 +206,6 @@ public interface Vertex extends Comparable<Vertex> { interface VertexConfig { int getMaxFailedTaskAttempts(); boolean getTaskRescheduleHigherPriority(); + boolean getTaskRescheduleRelaxedLocality(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/7e895f54/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 3c8a9b5..1fe65a9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1314,7 +1314,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.taskRacks = racks; // Ask for hosts / racks only if not a re-scheduled task. - if (ta.isRescheduled && ta.getVertex().getVertexConfig().getTaskRescheduleHigherPriority()) { + if (ta.isRescheduled && ta.getVertex().getVertexConfig().getTaskRescheduleRelaxedLocality()) { locationHint = null; } http://git-wip-us.apache.org/repos/asf/tez/blob/7e895f54/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 59552f2..209db5a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -4648,6 +4648,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private final int maxFailedTaskAttempts; private final boolean taskRescheduleHigherPriority; + private final boolean taskRescheduleRelaxedLocality; public VertexConfigImpl(Configuration conf) { this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, @@ -4655,6 +4656,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl this.taskRescheduleHigherPriority = conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT); + this.taskRescheduleRelaxedLocality = + conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY, + TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT); } @Override @@ -4666,5 +4670,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public boolean getTaskRescheduleHigherPriority() { return taskRescheduleHigherPriority; } + + @Override + public boolean getTaskRescheduleRelaxedLocality() { + return taskRescheduleRelaxedLocality; + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/7e895f54/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index a9d0c8d..7709bc0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -212,8 +212,14 @@ public class TestTaskAttempt { // Override the test defaults to setup the config change TezConfiguration vertexConf = new TezConfiguration(); vertexConf.setBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, false); + vertexConf.setBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY, true); when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf)); + // set locality + Set<String> hosts = new TreeSet<String>(); + hosts.add("host1"); + locationHint = TaskLocationHint.createTaskLocationHint(hosts, null); + TaskAttemptImpl.ScheduleTaskattemptTransition sta = new TaskAttemptImpl.ScheduleTaskattemptTransition(); @@ -241,12 +247,15 @@ public class TestTaskAttempt { verify(eventHandler, times(1)).handle(arg.capture()); AMSchedulerEventTALaunchRequest launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue(); Assert.assertEquals(2, launchEvent.getPriority()); + Assert.assertEquals(1, launchEvent.getLocationHint().getHosts().size()); + Assert.assertTrue(launchEvent.getLocationHint().getHosts().contains("host1")); // Verify priority for a retried attempt is the same sta.transition(taImplReScheduled, sEvent); verify(eventHandler, times(2)).handle(arg.capture()); launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue(); Assert.assertEquals(2, launchEvent.getPriority()); + Assert.assertNull(launchEvent.getLocationHint()); } @Test(timeout = 5000)
