Repository: tez Updated Branches: refs/heads/branch-0.9 29f654e2b -> 9f13abde3
TEZ-3982. DAGAppMaster and tasks should not report negative or invalid progress (Kuhu Shukla via jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9f13abde Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9f13abde Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9f13abde Branch: refs/heads/branch-0.9 Commit: 9f13abde3efb238228d8493452884af3bf11e5b3 Parents: 29f654e Author: Jason Lowe <[email protected]> Authored: Fri Sep 21 09:26:48 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Fri Sep 21 09:26:48 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/common/ProgressHelper.java | 5 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 12 ++- .../apache/tez/dag/app/TestDAGAppMaster.java | 82 ++++++++++++++++++++ 4 files changed, 97 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java index 407a20e..07b066c 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java +++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java @@ -47,7 +47,10 @@ public class ProgressHelper { if (inputs != null && inputs.size() != 0) { for (LogicalInput input : inputs.values()) { if (input instanceof AbstractLogicalInput) { - progSum += ((AbstractLogicalInput) input).getProgress(); + float inputProgress = ((AbstractLogicalInput) input).getProgress(); + if (inputProgress >= 0.0f && inputProgress <= 1.0f) { + progSum += inputProgress; + } } } progress = (1.0f) * progSum / inputs.size(); http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c4b8df0..7ff47fa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1271,7 +1271,7 @@ public class DAGAppMaster extends AbstractService { } public float getProgress() { - if (isSession && state.equals(DAGAppMasterState.IDLE)) { + if (isSession && getState().equals(DAGAppMasterState.IDLE)) { return 0.0f; } if(currentDAG != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index ecd8d17..8cb39a2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -804,9 +804,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, try { float progress = 0.0f; for (Vertex v : getVertices().values()) { - progress += v.getProgress(); + float vertexProgress = v.getProgress(); + if (vertexProgress >= 0.0f && vertexProgress <= 1.0f) { + progress += vertexProgress; + } + } + float dagProgress = progress / getTotalVertices(); + if (dagProgress >= 0.0f && progress <= 1.0f) { + return dagProgress; + } else { + return 0.0f; } - return progress / getTotalVertices(); } finally { this.readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 570c6dc..56c8a72 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -14,12 +14,17 @@ package org.apache.tez.dag.app; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezVertexID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -29,8 +34,10 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; @@ -387,6 +394,81 @@ public class TestDAGAppMaster { testDagCredentials(true); } + @Test + public void testBadProgress() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true); + conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString()); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + + // create some sample AM credentials + Credentials amCreds = new Credentials(); + JobTokenSecretManager jtsm = new JobTokenSecretManager(); + JobTokenIdentifier identifier = new JobTokenIdentifier( + new Text(appId.toString())); + Token<JobTokenIdentifier> sessionToken = + new Token<JobTokenIdentifier>(identifier, jtsm); + sessionToken.setService(identifier.getJobId()); + TokenCache.setSessionToken(sessionToken, amCreds); + TestTokenSecretManager ttsm = new TestTokenSecretManager(); + Text tokenAlias1 = new Text("alias1"); + Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("amtoken1")), ttsm); + amCreds.addToken(tokenAlias1, amToken1); + + FileSystem fs = FileSystem.getLocal(conf); + FSDataOutputStream sessionJarsPBOutStream = + TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(), + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); + DAGProtos.PlanLocalResourcesProto.getDefaultInstance() + .writeDelimitedTo(sessionJarsPBOutStream); + sessionJarsPBOutStream.close(); + DAGAppMaster am = spy(new DAGAppMaster(attemptId, + ContainerId.newContainerId(attemptId, 1), + "127.0.0.1", 0, 0, new SystemClock(), 1, true, + TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, + new String[] {TEST_DIR.toString()}, + new TezApiVersionInfo().getVersion(), amCreds, + "someuser", null)); + when(am.getState()).thenReturn(DAGAppMasterState.RUNNING); + am.init(conf); + am.start(); + Credentials dagCreds = new Credentials(); + Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("dagtoken1")), ttsm); + dagCreds.addToken(tokenAlias1, dagToken1); + Text tokenAlias3 = new Text("alias3"); + Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("dagtoken2")), ttsm); + dagCreds.addToken(tokenAlias3, dagToken2); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + DAGPlan dagPlan = DAGPlan.newBuilder() + .setName("somedag") + .setCredentialsBinary( + DagTypeConverters.convertCredentialsToProto(dagCreds)) + .build(); + DAGImpl dag = spy(am.createDAG(dagPlan, dagId)); + am.setCurrentDAG(dag); + when(dag.getState()).thenReturn(DAGState.RUNNING); + Map<TezVertexID, Vertex> map = new HashMap<TezVertexID, Vertex>(); + TezVertexID mockVertexID = mock(TezVertexID.class); + Vertex mockVertex = mock(Vertex.class); + when(mockVertex.getProgress()).thenReturn(Float.NaN); + map.put(mockVertexID, mockVertex); + when(dag.getVertices()).thenReturn(map); + when(dag.getTotalVertices()).thenReturn(1); + Assert.assertEquals("Progress was NaN and should be reported as 0", + 0, am.getProgress(), 0); + when(mockVertex.getProgress()).thenReturn(-10f); + Assert.assertEquals("Progress was negative and should be reported as 0", + 0, am.getProgress(), 0); + when(mockVertex.getProgress()).thenReturn(10f); + Assert.assertEquals("Progress was greater than 1 and should be reported as 0", + 0, am.getProgress(), 0); + } + @SuppressWarnings("deprecation") private void testDagCredentials(boolean doMerge) throws IOException { TezConfiguration conf = new TezConfiguration();
