Repository: tez Updated Branches: refs/heads/branch-0.9 0838c40fd -> 29f654e2b
Revert "TEZ-3982. DAGAppMaster and tasks should not report negative or invalid progress (Kuhu Shukla via jeagles)" This reverts commit 0838c40fd73995745a52eca3307548120facfe63. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/29f654e2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/29f654e2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/29f654e2 Branch: refs/heads/branch-0.9 Commit: 29f654e2bba7114d784bb121ae9a74dbee1bc13f Parents: 0838c40 Author: Jason Lowe <[email protected]> Authored: Fri Sep 21 08:30:00 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Fri Sep 21 08:30:00 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/common/ProgressHelper.java | 5 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 12 +-- .../apache/tez/dag/app/TestDAGAppMaster.java | 83 -------------------- 4 files changed, 4 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/29f654e2/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java index 07b066c..407a20e 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java +++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java @@ -47,10 +47,7 @@ public class ProgressHelper { if (inputs != null && inputs.size() != 0) { for (LogicalInput input : inputs.values()) { if (input instanceof AbstractLogicalInput) { - float inputProgress = ((AbstractLogicalInput) input).getProgress(); - if (inputProgress >= 0.0f && inputProgress <= 1.0f) { - progSum += inputProgress; - } + progSum += ((AbstractLogicalInput) input).getProgress(); } } progress = (1.0f) * progSum / inputs.size(); http://git-wip-us.apache.org/repos/asf/tez/blob/29f654e2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 7ff47fa..c4b8df0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1271,7 +1271,7 @@ public class DAGAppMaster extends AbstractService { } public float getProgress() { - if (isSession && getState().equals(DAGAppMasterState.IDLE)) { + if (isSession && state.equals(DAGAppMasterState.IDLE)) { return 0.0f; } if(currentDAG != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/29f654e2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 8cb39a2..ecd8d17 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -804,17 +804,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, try { float progress = 0.0f; for (Vertex v : getVertices().values()) { - float vertexProgress = v.getProgress(); - if (vertexProgress >= 0.0f && vertexProgress <= 1.0f) { - progress += vertexProgress; - } - } - float dagProgress = progress / getTotalVertices(); - if (dagProgress >= 0.0f && progress <= 1.0f) { - return dagProgress; - } else { - return 0.0f; + progress += v.getProgress(); } + return progress / getTotalVertices(); } finally { this.readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/tez/blob/29f654e2/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 7a7dfe2..570c6dc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -14,18 +14,12 @@ package org.apache.tez.dag.app; -import org.apache.hadoop.yarn.util.MonotonicClock; -import org.apache.tez.dag.app.dag.DAGState; -import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.records.TezVertexID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -35,10 +29,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; @@ -395,81 +387,6 @@ public class TestDAGAppMaster { testDagCredentials(true); } - @Test - public void testBadProgress() throws Exception { - TezConfiguration conf = new TezConfiguration(); - conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true); - conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString()); - ApplicationId appId = ApplicationId.newInstance(1, 1); - ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); - - // create some sample AM credentials - Credentials amCreds = new Credentials(); - JobTokenSecretManager jtsm = new JobTokenSecretManager(); - JobTokenIdentifier identifier = new JobTokenIdentifier( - new Text(appId.toString())); - Token<JobTokenIdentifier> sessionToken = - new Token<JobTokenIdentifier>(identifier, jtsm); - sessionToken.setService(identifier.getJobId()); - TokenCache.setSessionToken(sessionToken, amCreds); - TestTokenSecretManager ttsm = new TestTokenSecretManager(); - Text tokenAlias1 = new Text("alias1"); - Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>( - new TestTokenIdentifier(new Text("amtoken1")), ttsm); - amCreds.addToken(tokenAlias1, amToken1); - - FileSystem fs = FileSystem.getLocal(conf); - FSDataOutputStream sessionJarsPBOutStream = - TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(), - TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); - DAGProtos.PlanLocalResourcesProto.getDefaultInstance() - .writeDelimitedTo(sessionJarsPBOutStream); - sessionJarsPBOutStream.close(); - DAGAppMaster am = spy(new DAGAppMaster(attemptId, - ContainerId.newContainerId(attemptId, 1), - "127.0.0.1", 0, 0, new MonotonicClock(), 1, true, - TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, - new String[] {TEST_DIR.toString()}, - new TezApiVersionInfo().getVersion(), amCreds, - "someuser", null)); - when(am.getState()).thenReturn(DAGAppMasterState.RUNNING); - am.init(conf); - am.start(); - Credentials dagCreds = new Credentials(); - Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>( - new TestTokenIdentifier(new Text("dagtoken1")), ttsm); - dagCreds.addToken(tokenAlias1, dagToken1); - Text tokenAlias3 = new Text("alias3"); - Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>( - new TestTokenIdentifier(new Text("dagtoken2")), ttsm); - dagCreds.addToken(tokenAlias3, dagToken2); - TezDAGID dagId = TezDAGID.getInstance(appId, 1); - DAGPlan dagPlan = DAGPlan.newBuilder() - .setName("somedag") - .setCredentialsBinary( - DagTypeConverters.convertCredentialsToProto(dagCreds)) - .build(); - DAGImpl dag = spy(am.createDAG(dagPlan, dagId)); - am.setCurrentDAG(dag); - when(dag.getState()).thenReturn(DAGState.RUNNING); - Map<TezVertexID, Vertex> map = new HashMap<TezVertexID, Vertex>(); - TezVertexID mockVertexID = mock(TezVertexID.class); - Vertex mockVertex = mock(Vertex.class); - when(mockVertex.getProgress()).thenReturn(Float.NaN); - map.put(mockVertexID, mockVertex); - when(dag.getVertices()).thenReturn(map); - when(dag.getTotalVertices()).thenReturn(1); - Assert.assertEquals("Progress was NaN and should be reported as 0", - 0, am.getProgress(), 0); - when(mockVertex.getProgress()).thenReturn(-10f); - Assert.assertEquals("Progress was negative and should be reported as 0", - 0, am.getProgress(), 0); - when(mockVertex.getProgress()).thenReturn(10f); - Assert.assertEquals("Progress was greater than 1 and should be reported as 0", - 0, am.getProgress(), 0); - } - @SuppressWarnings("deprecation") private void testDagCredentials(boolean doMerge) throws IOException { TezConfiguration conf = new TezConfiguration();
