Repository: tez Updated Branches: refs/heads/TEZ-2980 d83904453 -> 478a5349d
TEZ-3053. Containers timeout if they do not receive a task within the container timeout interval. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e171fddc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e171fddc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e171fddc Branch: refs/heads/TEZ-2980 Commit: e171fddce3ae657dcc2baaf6b50913ef06a1d70c Parents: cc06400 Author: Siddharth Seth <[email protected]> Authored: Wed Jan 20 10:30:41 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Wed Jan 20 10:30:41 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/TezTaskCommunicatorImpl.java | 3 +- .../dag/app/TestTezTaskCommunicatorManager.java | 72 ++++++++++++++++++++ 3 files changed, 75 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e171fddc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f1cc292..3b8b016 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3053. Containers timeout if they do not receive a task within the container timeout interval. TEZ-2898. tez tools : swimlanes.py is broken. TEZ-2937. Can Processor.close() be called after closing inputs and outputs? TEZ-3037. History URL should be set regardless of which history logging service is enabled. http://git-wip-us.apache.org/repos/asf/tez/blob/e171fddc/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 0bbe97a..b879f07 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -410,7 +410,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { private ContainerTask getContainerTask(ContainerId containerId) throws IOException { ContainerInfo containerInfo = registeredContainers.get(containerId); - ContainerTask task = null; + ContainerTask task; if (containerInfo == null) { if (getContext().isKnownContainer(containerId)) { LOG.info("Container with id: " + containerId @@ -422,6 +422,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { task = TASK_FOR_INVALID_JVM; } else { synchronized (containerInfo) { + getContext().containerAlive(containerId); if (containerInfo.taskSpec != null) { if (!containerInfo.taskPulled) { containerInfo.taskPulled = true; http://git-wip-us.apache.org/repos/asf/tez/blob/e171fddc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java new file mode 100644 index 0000000..65f43a8 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.app; + +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.TezTaskCommunicatorImpl; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; +import org.junit.Test; + +public class TestTezTaskCommunicatorManager { + + @Test (timeout = 5000) + public void testContainerAliveOnGetTask() throws IOException { + + TaskCommunicatorContext context = mock(TaskCommunicatorContext.class); + Configuration conf = new Configuration(false); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + + + + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = createContainerId(appId, 1); + + doReturn(appAttemptId).when(context).getApplicationAttemptId(); + doReturn(userPayload).when(context).getInitialUserPayload(); + doReturn(new Credentials()).when(context).getCredentials(); + + TezTaskCommunicatorImpl taskComm = new TezTaskCommunicatorImpl(context); + + ContainerContext containerContext = new ContainerContext(containerId.toString()); + taskComm.registerRunningContainer(containerId, "fakehost", 0); + ContainerTask containerTask = taskComm.getUmbilical().getTask(containerContext); + assertNull(containerTask); + + verify(context).containerAlive(containerId); + } + + @SuppressWarnings("deprecation") + private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); + return ContainerId.newInstance(appAttemptId, containerIdx); + } +}
