Repository: tez Updated Branches: refs/heads/master 7d73bb2dc -> 39d76a656
TEZ-3969. TaskAttemptImpl: static fields initialized in instance ctor (Jaume Marhuenda via jegales) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/39d76a65 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/39d76a65 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/39d76a65 Branch: refs/heads/master Commit: 39d76a656216d4843908279ef8eaa29a4cc83104 Parents: 7d73bb2 Author: Jaume Marhuenda <[email protected]> Authored: Tue Oct 9 13:12:17 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Oct 9 13:12:17 2018 -0500 ---------------------------------------------------------------------- tez-dag/findbugs-exclude.xml | 11 ----- .../java/org/apache/tez/dag/app/dag/Vertex.java | 13 ++++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 38 +++++++--------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 46 ++++++++++++++++++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 28 +++++++++--- 5 files changed, 97 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index 1150ccb..a6ce380 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -252,15 +252,4 @@ <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/> </Match> - <!-- TEZ-2552 --> - <Match> - <Class name="org.apache.tez.dag.app.dag.impl.TaskAttemptImpl"/> - <Or> - <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/> - <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/> - <Field name="MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC"/> - </Or> - <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/> - </Match> - </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 0e54e9f..0b2406f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -213,6 +213,19 @@ public interface Vertex extends Comparable<Vertex> { int getMaxFailedTaskAttempts(); boolean getTaskRescheduleHigherPriority(); boolean getTaskRescheduleRelaxedLocality(); + + /** + * @return tez.task.max.allowed.output.failures. + */ + int getMaxAllowedOutputFailures(); + /** + * @return tez.task.max.allowed.output.failures.fraction. + */ + double getMaxAllowedOutputFailuresFraction(); + /** + * @return tez.am.max.allowed.time-sec.for-read-error. + */ + int getMaxAllowedTimeForTaskReadErrorSec(); } void incrementRejectedTaskAttemptCount(); http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index bbec9ea..7399979 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -214,9 +214,6 @@ public class TaskAttemptImpl implements TaskAttempt, Set<String> taskRacks = new HashSet<String>(); private Map<TezTaskAttemptID, Long> uniquefailedOutputReports = Maps.newHashMap(); - private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION; - private static int MAX_ALLOWED_OUTPUT_FAILURES; - private static int MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC; protected final boolean isRescheduled; private final Resource taskResource; @@ -548,18 +545,6 @@ public class TaskAttemptImpl implements TaskAttempt, Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec, TezTaskAttemptID schedulingCausalTA) { - // TODO: Move these configs over to Vertex.VertexConfig - MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration - .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration - .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); - - MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration - .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration - .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); - - MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = conf.getInt( - TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, - TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT); ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -1793,17 +1778,24 @@ public class TaskAttemptImpl implements TaskAttempt, attempt.uniquefailedOutputReports.put(failedDestTaId, time); firstErrReportTime = time; } - + + int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig() + .getMaxAllowedOutputFailures(); + int maxAllowedTimeForTaskReadErrorSec = attempt.getVertex() + .getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec(); + double maxAllowedOutputFailuresFraction = attempt.getVertex() + .getVertexConfig().getMaxAllowedOutputFailuresFraction(); + int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000); - boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC; + boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec; int runningTasks = attempt.appContext.getCurrentDAG().getVertex( failedDestTaId.getTaskID().getVertexID()).getRunningTasks(); float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0; boolean withinFailureFractionLimits = - (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION); + (failureFraction <= maxAllowedOutputFailuresFraction); boolean withinOutputFailureLimits = - (attempt.uniquefailedOutputReports.size() < MAX_ALLOWED_OUTPUT_FAILURES); + (attempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures); // If needed we can launch a background task without failing this task // to generate a copy of the output just in case. @@ -1813,10 +1805,12 @@ public class TaskAttemptImpl implements TaskAttempt, } String message = attempt.getID() + " being failed for too many output errors. " + "failureFraction=" + failureFraction - + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + + maxAllowedOutputFailuresFraction + ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() - + ", MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES - + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC + + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures + + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + + maxAllowedTimeForTaskReadErrorSec + ", readErrorTimespan=" + readErrorTimespanSec; LOG.info(message); attempt.addDiagnosticInfo(message); http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 0184657..a4d2de1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -4690,6 +4690,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private final boolean taskRescheduleHigherPriority; private final boolean taskRescheduleRelaxedLocality; + /** + * See tez.task.max.allowed.output.failures.fraction. + */ + private final double maxAllowedOutputFailuresFraction; + /** + * See tez.task.max.allowed.output.failures. + */ + private final int maxAllowedOutputFailures; + /** + * See tez.am.max.allowed.time-sec.for-read-error. + */ + private final int maxAllowedTimeForTaskReadErrorSec; + public VertexConfigImpl(Configuration conf) { this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); @@ -4699,6 +4712,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl this.taskRescheduleRelaxedLocality = conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY, TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT); + + this.maxAllowedOutputFailures = conf.getInt(TezConfiguration + .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration + .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); + + this.maxAllowedOutputFailuresFraction = conf.getDouble(TezConfiguration + .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration + .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); + + this.maxAllowedTimeForTaskReadErrorSec = conf.getInt( + TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, + TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT); } @Override @@ -4715,5 +4740,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public boolean getTaskRescheduleRelaxedLocality() { return taskRescheduleRelaxedLocality; } + + /** + * @return maxAllowedOutputFailures. + */ + @Override public int getMaxAllowedOutputFailures() { + return maxAllowedOutputFailures; + } + + /** + * @return maxAllowedOutputFailuresFraction. + */ + @Override public double getMaxAllowedOutputFailuresFraction() { + return maxAllowedOutputFailuresFraction; + } + + /** + * @return maxAllowedTimeForTaskReadErrorSec. + */ + @Override public int getMaxAllowedTimeForTaskReadErrorSec() { + return maxAllowedTimeForTaskReadErrorSec; + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 5ab68f7..5038810 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -160,15 +160,20 @@ public class TestTaskAttempt { when(appCtx.getContainerLauncherName(anyInt())).thenReturn( TezConstants.getTezYarnServicePluginName()); - mockVertex = mock(Vertex.class); - when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); - when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf)); + createMockVertex(vertexConf); HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); LogManager.getRootLogger().setLevel(Level.DEBUG); } + private void createMockVertex(Configuration conf) { + mockVertex = mock(Vertex.class); + when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); + when(mockVertex.getVertexConfig()).thenReturn( + new VertexImpl.VertexConfigImpl(conf)); + } + @Test(timeout = 5000) public void testLocalityRequest() { TaskAttemptImpl.ScheduleTaskattemptTransition sta = @@ -1919,7 +1924,11 @@ public class TestTaskAttempt { verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle( arg.capture()); - taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 1); + Configuration newVertexConf = new Configuration(vertexConf); + newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, + 1); + createMockVertex(newVertexConf); + TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2); MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler, taListener, taskConf, new SystemClock(), @@ -1953,8 +1962,15 @@ public class TestTaskAttempt { Clock mockClock = mock(Clock.class); int readErrorTimespanSec = 1; - taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 10); - taskConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, readErrorTimespanSec); + + newVertexConf = new Configuration(vertexConf); + newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, + 10); + newVertexConf.setInt( + TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, + readErrorTimespanSec); + createMockVertex(newVertexConf); + TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3); MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1, eventHandler, taListener, taskConf, mockClock,
