Repository: tez Updated Branches: refs/heads/master 92e8927a2 -> c6c9f6ecd
TEZ-1788. Allow vertex level disabling of speculation (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6c9f6ec Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6c9f6ec Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6c9f6ec Branch: refs/heads/master Commit: c6c9f6ecd62e2d670a85a626ad054bbb68b0f4f7 Parents: 92e8927 Author: Bikas Saha <[email protected]> Authored: Thu Oct 8 11:39:27 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Oct 8 11:39:27 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/TestSpeculation.java | 60 +++++++++++++++++++- 2 files changed, 60 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c6c9f6ec/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8cd6400..8e5da31 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1788. Allow vertex level disabling of speculation TEZ-2868. Fix setting Caller Context in Tez Examples. TEZ-2860. NPE in DAGClientImpl. TEZ-2855. Fix a potential NPE while routing VertexManager events. http://git-wip-us.apache.org/repos/asf/tez/blob/c6c9f6ec/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java index 3413762..9a39fac 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java @@ -26,6 +26,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; @@ -35,6 +42,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -57,6 +65,8 @@ public class TestSpeculation { defaultConf.set("fs.defaultFS", "file:///"); defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); + defaultConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 1); + defaultConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 1); localFs = FileSystem.getLocal(defaultConf); String stagingDir = "target" + Path.SEPARATOR + TestSpeculation.class.getName() + "-tmpDir"; defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir); @@ -103,6 +113,7 @@ public class TestSpeculation { TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1); mockLauncher.updateProgress(withProgress); + // cause speculation trigger mockLauncher.setStatusUpdatesForTask(killedTaId, 100); mockLauncher.startScheduling(true); @@ -116,7 +127,7 @@ public class TestSpeculation { Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, killedAttempt.getTerminationCause()); if (withProgress) { - // without progress updates occasionally more than 1 task specualates + // without progress updates occasionally more than 1 task speculates Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS) .getValue()); Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) @@ -137,6 +148,53 @@ public class TestSpeculation { public void testBasicSpeculationWithoutProgress() throws Exception { testBasicSpeculation(false); } + + @Test (timeout=10000) + public void testBasicSpeculationPerVertexConf() throws Exception { + DAG dag = DAG.create("test"); + String vNameNoSpec = "A"; + String vNameSpec = "B"; + Vertex vA = Vertex.create(vNameNoSpec, ProcessorDescriptor.create("Proc.class"), 5); + Vertex vB = Vertex.create(vNameSpec, ProcessorDescriptor.create("Proc.class"), 5); + vA.setConf(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, "false"); + dag.addVertex(vA); + dag.addVertex(vB); + // min/max src fraction is set to 1. So vertices will run sequentially + dag.addEdge( + Edge.create(vA, vB, + EdgeProperty.create(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("O"), + InputDescriptor.create("I")))); + + MockTezClient tezClient = createTezSession(); + + DAGClient dagClient = tezClient.submitDAG(dag); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId(); + TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId(); + // original attempt is killed and speculative one is successful + TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), + 0); + TezTaskAttemptID noSpecTaId = TezTaskAttemptID + .getInstance(TezTaskID.getInstance(vertexIdNoSpec, 0), 0); + + // cause speculation trigger for both + mockLauncher.setStatusUpdatesForTask(killedTaId, 100); + mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100); + + mockLauncher.startScheduling(true); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexId); + org.apache.tez.dag.app.dag.Vertex vNoSpec = dagImpl.getVertex(vertexIdNoSpec); + // speculation for vA but not for vB + Assert.assertTrue(vSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) + .getValue() > 0); + Assert.assertEquals(0, vNoSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS) + .getValue()); + + tezClient.stop(); + } @Test (timeout=10000) public void testBasicSpeculationNotUseful() throws Exception {
