TEZ-3271. Provide mapreduce failures.maxpercent equivalent. (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a33d2213 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a33d2213 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a33d2213 Branch: refs/heads/TEZ-3334 Commit: a33d2213b54cd3dfddbff4155d047674a4416f35 Parents: 43ca78f Author: Jonathan Eagles <[email protected]> Authored: Mon Dec 5 13:18:33 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Dec 5 13:18:33 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 10 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 28 +++++ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 +++++++++-- .../tez/dag/app/dag/impl/TestVertexImpl.java | 116 ++++++++++++++++++- .../java/org/apache/tez/test/TestTezJobs.java | 50 ++++++++ 6 files changed, 262 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d9a7ca6..6bbbfb2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3271. Provide mapreduce failures.maxpercent equivalent. TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case. TEZ-3547. Add TaskAssignment Analyzer. TEZ-3508. TestTaskScheduler cleanup. http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index a09e888..e27cdf8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -994,7 +994,15 @@ public class TezConfiguration extends Configuration { public static final String TEZ_AM_PREEMPTION_PERCENTAGE = TEZ_AM_PREFIX + "preemption.percentage"; public static final int TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT = 10; - + + /** + * Float value. Specifies the allowable percentage in the range 0.0-100.0f of task + * failures per vertex that will allow the vertex to succeed with failures. + */ + @ConfigurationScope(Scope.VERTEX) + public static final String TEZ_VERTEX_FAILURES_MAXPERCENT = + "tez.vertex.failures.maxpercent"; + public static final float TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT = 0.0f; /** * Int value. The number of RM heartbeats to wait after preempting running tasks before preempting * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index 9640f06..690df63 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -23,9 +23,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Deflater; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.event.EventHandler; @@ -68,6 +71,31 @@ public class Edge { private static final Logger LOG = LoggerFactory.getLogger(Edge.class); + public List<TezEvent> generateEmptyEventsForAttempt(TezTaskAttemptID attempt) throws Exception { + + if (!edgeProperty.getEdgeSource().getClassName().startsWith("org.apache.tez")) { + throw new TezException("Only org.apache.tez outputs are allowed for max percent failure feature. Disallowed Output: " + + edgeProperty.getEdgeSource().getClassName()); + } + List<Event> events = new ArrayList<>(); + Deflater deflater = TezCommonUtils.newBestCompressionDeflater(); + try { + ShuffleUtils.generateEventsForNonStartedOutput(events, + edgeManager.getNumDestinationConsumerTasks(attempt.getTaskID().getId()), null, false, true, deflater); + } catch (Exception e) { + throw new TezException(e); + } + EventMetaData sourceInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, + sourceVertex.getName(), getDestinationVertexName(), attempt); + + List<TezEvent> tezEvents = new ArrayList<>(events.size()); + for (Event e : events) { + TezEvent tezEvent = new TezEvent(e, sourceInfo); + tezEvents.add(tezEvent); + } + return tezEvents; + } + class EdgeManagerPluginContextImpl implements EdgeManagerPluginContext { private final UserPayload userPayload; http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 3f6debf..bf291b7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -255,6 +255,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl final ServicePluginInfo servicePluginInfo; + private final float maxFailuresPercent; + private boolean logSuccessDiagnostics = false; + //fields initialized in init @VisibleForTesting @@ -960,7 +963,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (isSpeculationEnabled()) { speculator = new LegacySpeculator(vertexConf, getAppContext(), this); } - + + maxFailuresPercent = vertexConf.getFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, + TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT); // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -1987,7 +1992,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl void logJobHistoryVertexFinishedEvent() throws IOException { if (recoveryData == null || !recoveryData.isVertexSucceeded()) { - logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "", + logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, + logSuccessDiagnostics ? StringUtils.join(getDiagnostics(), LINE_SEPARATOR) : "", getAllCounters()); } } @@ -2121,10 +2127,52 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (vertex.completedTaskCount == vertex.tasks.size()) { // finished - gather stats vertex.finalStatistics = vertex.constructStatistics(); - - //Only succeed if tasks complete successfully and no terminationCause is registered. - if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) { - LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier); + + //Only succeed if tasks complete successfully and no terminationCause is registered or if failures are below configured threshold. + boolean vertexSucceeded = vertex.succeededTaskCount == vertex.numTasks; + boolean vertexFailuresBelowThreshold = (vertex.succeededTaskCount + vertex.failedTaskCount == vertex.numTasks) + && (vertex.failedTaskCount * 100 <= vertex.maxFailuresPercent * vertex.numTasks); + + if((vertexSucceeded || vertexFailuresBelowThreshold) && vertex.terminationCause == null) { + if(vertexSucceeded) { + LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier); + } else { + LOG.info("All tasks in the vertex " + vertex.logIdentifier + " have completed and the percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent); + vertex.addDiagnostic("Vertex succeeded as percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " + vertex.maxFailuresPercent); + vertex.logSuccessDiagnostics = true; + for (Task task : vertex.tasks.values()) { + if (!task.getState().equals(TaskState.FAILED)) { + continue; + } + // Find the last attempt and mark that as successful + Iterator<TezTaskAttemptID> attempts = task.getAttempts().keySet().iterator(); + TezTaskAttemptID lastAttempt = null; + while (attempts.hasNext()) { + TezTaskAttemptID attempt = attempts.next(); + if (lastAttempt == null || attempt.getId() > lastAttempt.getId()) { + lastAttempt = attempt; + } + } + LOG.info("Succeeding failed task attempt:" + lastAttempt); + for (Map.Entry<Vertex, Edge> vertexEdge : vertex.targetVertices.entrySet()) { + Vertex destVertex = vertexEdge.getKey(); + Edge edge = vertexEdge.getValue(); + try { + List<TezEvent> tezEvents = edge.generateEmptyEventsForAttempt(lastAttempt); + + // Downstream vertices need to receive a SUCCEEDED completion event for each failed task to ensure num bipartite count is correct + VertexEventTaskAttemptCompleted completionEvent = new VertexEventTaskAttemptCompleted(lastAttempt, TaskAttemptStateInternal.SUCCEEDED); + + // Notify all target vertices + vertex.eventHandler.handle(new VertexEventSourceTaskAttemptCompleted(destVertex.getVertexId(), completionEvent)); + vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex.getVertexId(), tezEvents)); + } catch (Exception e) { + throw new TezUncheckedException(e); + } + } + } + } + if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) { // start commit if there're commits or just finish if no commits return commitOrFinish(vertex); @@ -3440,11 +3488,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl vertex.completedTasksStatsCache.mergeFrom(((TaskImpl) task).getStatistics()); } } else if (taskEvent.getState() == TaskState.FAILED) { - LOG.info("Failing vertex: " + vertex.logIdentifier + - " because task failed: " + taskEvent.getTaskID()); - vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE); - forceTransitionToKillWait = true; taskFailed(vertex, task); + if (vertex.failedTaskCount * 100 > vertex.maxFailuresPercent * vertex.numTasks) { + LOG.info("Failing vertex: " + vertex.logIdentifier + + " because task failed: " + taskEvent.getTaskID()); + vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE); + forceTransitionToKillWait = true; + } } else if (taskEvent.getState() == TaskState.KILLED) { taskKilled(vertex, task); } http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index b7e63f6..a11311d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -56,11 +56,14 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.VertexStatistics; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.apache.tez.test.GraceShuffleVertexManagerForTest; @@ -1823,7 +1826,7 @@ public class TestVertexImpl { EdgePlan.newBuilder() .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")) .setInputVertexName("vertex4") - .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o4")) + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("org.apache.tez.o4")) .setOutputVertexName("vertex6") .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) .setId("e5") @@ -1835,7 +1838,7 @@ public class TestVertexImpl { EdgePlan.newBuilder() .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")) .setInputVertexName("vertex5") - .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o5")) + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("org.apache.tez.o5")) .setOutputVertexName("vertex6") .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) .setId("e6") @@ -3740,10 +3743,119 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.SUCCEEDED, v4.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v5.getState()); + Assert.assertEquals(VertexState.RUNNING, v6.getState()); + Assert.assertEquals(4, v6.numSuccessSourceAttemptCompletions); + + } + + @Test(timeout = 5000) + public void testFailuresMaxPercentSourceTaskAttemptCompletionEvents() throws TezException { + LOG.info("Testing testFailuresMaxPercentSourceTaskAttemptCompletionEvents"); + // Override the basic setup for this test to inject the specific config setting needed for this test + useCustomInitializer = false; + customInitializer = null; + setupPreDagCreation(); + conf.setFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, 50.0f); + conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + dagPlan = createTestDAGPlan(); + setupPostDagCreation(); + initAllVertices(VertexState.INITED); + + VertexImpl v4 = vertices.get("vertex4"); + VertexImpl v5 = vertices.get("vertex5"); + VertexImpl v6 = vertices.get("vertex6"); + + startVertex(vertices.get("vertex1")); + startVertex(vertices.get("vertex2")); + dispatcher.await(); + LOG.info("Verifying v6 state " + v6.getState()); + Assert.assertEquals(VertexState.RUNNING, v6.getState()); + + TezTaskID t1_v4 = TezTaskID.getInstance(v4.getVertexId(), 0); + TezTaskID t2_v4 = TezTaskID.getInstance(v4.getVertexId(), 1); + TezTaskID t1_v5 = TezTaskID.getInstance(v5.getVertexId(), 0); + TezTaskID t2_v5 = TezTaskID.getInstance(v5.getVertexId(), 1); + + TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0); + TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance(t2_v4, 0); + TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance(t1_v5, 0); + TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0); + + TaskSpec taskSpec = new TaskSpec("dag", "vertex", 2, new ProcessorDescriptor(), new ArrayList<InputSpec>(), + new ArrayList<OutputSpec>(), null, conf); + TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(null, null); + + // Tasks can only succeed from a scheduled or running state + dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t1_v4, taskSpec, locationHint, false)); + dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t2_v4, taskSpec, locationHint, false)); + + // Completed tasks are less that the max percent failure + dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t1_v4, TaskFailureType.NON_FATAL, null)); + dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta1_t2_v4)); + dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta1_t1_v5)); + dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t2_v5, TaskFailureType.NON_FATAL, null)); + dispatcher.await(); + + Assert.assertEquals(VertexState.SUCCEEDED, v4.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v5.getState()); Assert.assertEquals(VertexState.RUNNING, v6.getState()); Assert.assertEquals(4, v6.numSuccessSourceAttemptCompletions); + } + @Test(timeout = 5000) + public void testFailuresMaxPercentExceededSourceTaskAttemptCompletionEvents() throws TezException { + LOG.info("Testing testFailuresMaxPercentSourceTaskAttemptCompletionEvents"); + + // Override the basic setup for this test to inject the specific config setting needed for this test + useCustomInitializer = false; + customInitializer = null; + setupPreDagCreation(); + conf.setFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, 50.0f); + conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + dagPlan = createTestDAGPlan(); + setupPostDagCreation(); + initAllVertices(VertexState.INITED); + + VertexImpl v4 = vertices.get("vertex4"); + VertexImpl v5 = vertices.get("vertex5"); + VertexImpl v6 = vertices.get("vertex6"); + + startVertex(vertices.get("vertex1")); + startVertex(vertices.get("vertex2")); + dispatcher.await(); + LOG.info("Verifying v6 state " + v6.getState()); + Assert.assertEquals(VertexState.RUNNING, v6.getState()); + + TezTaskID t1_v4 = TezTaskID.getInstance(v4.getVertexId(), 0); + TezTaskID t2_v4 = TezTaskID.getInstance(v4.getVertexId(), 1); + TezTaskID t1_v5 = TezTaskID.getInstance(v5.getVertexId(), 0); + TezTaskID t2_v5 = TezTaskID.getInstance(v5.getVertexId(), 1); + + TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0); + TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance(t2_v4, 0); + TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance(t1_v5, 0); + TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0); + + TaskSpec taskSpec = new TaskSpec("dag", "vertex", 2, new ProcessorDescriptor(), new ArrayList<InputSpec>(), + new ArrayList<OutputSpec>(), null, conf); + TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(null, null); + + // Tasks can only succeed from a scheduled or running state + dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t1_v4, taskSpec, locationHint, false)); + dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t2_v4, taskSpec, locationHint, false)); + + // Completed tasks are more that the max percent failure + dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t1_v4, TaskFailureType.NON_FATAL, null)); + dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t2_v4, TaskFailureType.NON_FATAL, null)); + dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta1_t1_v5)); + dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t2_v5, TaskFailureType.NON_FATAL, null)); + dispatcher.await(); + + Assert.assertEquals(VertexState.FAILED, v4.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v5.getState()); + Assert.assertEquals(VertexState.RUNNING, v6.getState()); + Assert.assertEquals(2, v6.numSuccessSourceAttemptCompletions); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 241c6e9..479509d 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -43,14 +43,19 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -1033,6 +1038,21 @@ public class TestTezJobs { } } + public static class FailingAttemptProcessor extends SimpleProcessor { + + public FailingAttemptProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + if (getContext().getTaskIndex() == 0) { + LOG.info("Failing task " + getContext().getTaskIndex() + ", attempt " + getContext().getTaskAttemptNumber()); + throw new IOException("Failing task " + getContext().getTaskIndex() + ", attempt " + getContext().getTaskAttemptNumber()); + } + } + } + public static class InputInitializerForTest extends InputInitializer { private final ReentrantLock lock = new ReentrantLock(); @@ -1281,5 +1301,35 @@ public class TestTezJobs { } } + @Test(timeout = 60000) + public void testVertexFailuresMaxPercent() throws TezException, InterruptedException, IOException { + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, "50.0f"); + tezConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + TezClient tezClient = TezClient.create("TestVertexFailuresMaxPercent", tezConf); + tezClient.start(); + + try { + DAG dag = DAG.create("TestVertexFailuresMaxPercent"); + Vertex vertex1 = Vertex.create("Parent", ProcessorDescriptor.create( + FailingAttemptProcessor.class.getName()), 2); + Vertex vertex2 = Vertex.create("Child", ProcessorDescriptor.create(FailingAttemptProcessor.class.getName()), 2); + + OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName()) + .setFromConfiguration(tezConf) + .build(); + dag.addVertex(vertex1) + .addVertex(vertex2) + .addEdge(Edge.create(vertex1, vertex2, edgeConfig.createDefaultEdgeProperty())); + DAGClient dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + } finally { + tezClient.stop(); + } + } }
