TEZ-2621. rebase 07/14. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0539d315 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0539d315 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0539d315 Branch: refs/heads/TEZ-2003 Commit: 0539d315d1ef716267e3ce81c188e404133a47e2 Parents: b5459e2 Author: Siddharth Seth <[email protected]> Authored: Tue Jul 14 17:26:25 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:26:10 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../app/TestTaskAttemptListenerImplTezDag.java | 25 +++++++++++++++++--- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 7 ++++-- 3 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0539d315/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 1e8abcf..590fe7f 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -32,5 +32,6 @@ ALL CHANGES: TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. TEZ-2508. rebase 06/01 TEZ-2526. Fix version for tez-history-parser. + TEZ-2621. rebase 07/14 INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/0539d315/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 68d3baf..7f0362d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -35,7 +35,9 @@ import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -45,7 +47,9 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.ContainerEndReason; import org.apache.tez.dag.api.TaskAttemptEndReason; @@ -284,11 +288,18 @@ public class TestTaskAttemptListenerImplTezDag { } } + // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well. @Test (timeout= 5000) public void testPortRange_NotSpecified() { Configuration conf = new Configuration(); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + "fakeIdentifier")); + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + TokenCache.setSessionToken(sessionToken, credentials); taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false); // no exception happen, should started properly taskAttemptListener.init(conf); taskAttemptListener.start(); @@ -298,12 +309,20 @@ public class TestTaskAttemptListenerImplTezDag { boolean succeedToAllocate = true; try { Configuration conf = new Configuration(); + + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + "fakeIdentifier")); + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + TokenCache.setSessionToken(sessionToken, credentials); + conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port); taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false); taskAttemptListener.init(conf); taskAttemptListener.start(); - int resultedPort = taskAttemptListener.getAddress().getPort(); + int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort(); assertEquals(port, resultedPort); } catch (Exception e) { succeedToAllocate = false; http://git-wip-us.apache.org/repos/asf/tez/blob/0539d315/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index d6fc46e..2bf1c85 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -1270,8 +1271,10 @@ public class TestTaskAttempt { MockEventHandler eventHandler = spy(new MockEventHandler()); TaskAttemptListener taListener = mock(TaskAttemptListener.class); - when(taListener.getAddress()).thenReturn( + TaskCommunicator mockTaskComm = mock(TaskCommunicator.class); + when(mockTaskComm.getAddress()).thenReturn( new InetSocketAddress("localhost", 0)); + when(taListener.getTaskCommunicator(any(int.class))).thenReturn(mockTaskComm); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1292,7 +1295,7 @@ public class TestTaskAttempt { AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), new ContainerContextMatcher(), appCtx); - containers.addContainerIfNew(container); + containers.addContainerIfNew(container, 0, 0, 0); doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); doReturn(containers).when(appCtx).getAllContainers();
