TEZ-2678. Fix comments from reviews - part 1. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2ecffa71 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2ecffa71 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2ecffa71 Branch: refs/heads/master Commit: 2ecffa7150da2350bd6d2c8cd9fa3f97bccaf373 Parents: 80cd3fe Author: Siddharth Seth <[email protected]> Authored: Tue Aug 11 11:19:09 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:15:23 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 2 +- .../org/apache/tez/client/TezClientUtils.java | 14 +- .../main/java/org/apache/tez/dag/api/DAG.java | 76 ++++++++++- .../apache/tez/dag/api/DagTypeConverters.java | 2 +- .../api/ContainerLauncherOperationBase.java | 8 +- .../api/ServicePluginsDescriptor.java | 13 ++ .../api/TaskAttemptEndReason.java | 2 +- .../java/org/apache/tez/dag/api/TestDAG.java | 2 +- .../org/apache/tez/dag/api/TestDAGPlan.java | 113 ++++++++++++++- .../tez/dag/api/TestDagTypeConverters.java | 11 +- .../org/apache/tez/common/TezUtilsInternal.java | 5 +- .../apache/tez/dag/api/TaskCommunicator.java | 12 +- .../tez/dag/api/TaskCommunicatorContext.java | 2 +- .../apache/tez/dag/app/TaskAttemptListener.java | 4 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 9 +- .../dag/app/TaskCommunicatorContextImpl.java | 15 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 10 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 13 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 9 +- .../app/launcher/ContainerLauncherRouter.java | 19 +-- .../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 8 +- .../dag/app/rm/LocalTaskSchedulerService.java | 2 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 5 +- .../dag/app/rm/YarnTaskSchedulerService.java | 5 +- .../dag/app/rm/container/AMContainerImpl.java | 62 +++++---- .../tez/dag/app/rm/node/AMNodeTracker.java | 2 +- .../app/TestTaskAttemptListenerImplTezDag.java | 6 +- .../app/TestTaskAttemptListenerImplTezDag2.java | 3 +- .../app/TestTaskCommunicatorContextImpl.java | 85 ++++++++++++ .../dag/app/TestTaskCommunicatorManager.java | 4 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 12 +- .../launcher/TestContainerLauncherRouter.java | 6 +- .../tez/dag/app/rm/TestContainerReuse.java | 136 +++++++++++++------ .../app/rm/TestLocalTaskSchedulerService.java | 4 +- .../tez/dag/app/rm/TestTaskScheduler.java | 18 +-- .../app/rm/TestTaskSchedulerEventHandler.java | 13 +- .../dag/app/rm/container/TestAMContainer.java | 127 +++++++++++------ .../org/apache/tez/examples/JoinValidate.java | 8 ++ tez-ext-service-tests/pom.xml | 5 - .../rm/TezTestServiceTaskSchedulerService.java | 5 +- .../TezTestServiceTaskCommunicatorImpl.java | 10 +- .../tez/service/impl/ContainerRunnerImpl.java | 2 +- .../apache/tez/runtime/task/TezTaskRunner2.java | 16 +-- .../runtime/task/TaskExecutionTestHelpers.java | 1 + .../runtime/task/TestContainerExecution.java | 1 + 46 files changed, 658 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 75fac88..fd3374e 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -45,5 +45,6 @@ ALL CHANGES: TEZ-2126. Add unit tests for verifying multiple schedulers, launchers, communicators. TEZ-2698. rebase 08/05 TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins + TEZ-2678. Fix comments from reviews - part 1. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 9e7fe51..e3e9e74 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -473,7 +473,7 @@ public class TezClient { Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials); DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, - usingTezArchiveDeploy, sessionCredentials, aclConfigs); + usingTezArchiveDeploy, sessionCredentials, aclConfigs, servicePluginsDescriptor); SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder(); requestBuilder.setDAGPlan(dagPlan).build(); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 6086fa1..ecf5c07 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -609,7 +609,7 @@ public class TezClientUtils { if(dag != null) { DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, - sessionCreds); + sessionCreds, servicePluginsDescriptor); // emit protobuf DAG file style Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath); @@ -685,18 +685,19 @@ public class TezClientUtils { static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive, - Credentials credentials) throws IOException { + Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) throws IOException { return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials, - null); + null, servicePluginsDescriptor); } static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive, - Credentials credentials, Map<String, String> additionalDAGConfigs) throws IOException { + Credentials credentials, Map<String, String> additionalDAGConfigs, + ServicePluginsDescriptor servicePluginsDescriptor) throws IOException { Credentials dagCredentials = setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration()); return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources, - amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs); + amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, servicePluginsDescriptor); } static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) { @@ -776,7 +777,7 @@ public class TezClientUtils { } AMPluginDescriptorProto pluginDescriptorProto = - DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); builder.setAmPluginDescriptor(pluginDescriptorProto); return builder.build(); @@ -1035,4 +1036,5 @@ public class TezClientUtils { } } } + } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 927039a..78bb660 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -35,6 +35,8 @@ import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; import org.apache.hadoop.classification.InterfaceStability; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -714,14 +716,15 @@ public class DAG { Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, boolean tezLrsAsArchive) { return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive, - null); + null, null); } // create protobuf message describing DAG @Private public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials, Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, - boolean tezLrsAsArchive, Map<String, String> additionalConfigs) { + boolean tezLrsAsArchive, Map<String, String> additionalConfigs, + ServicePluginsDescriptor servicePluginsDescriptor) { verify(true); DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); @@ -732,6 +735,7 @@ public class DAG { // Setup default execution context. VertexExecutionContext defaultContext = getDefaultExecutionContext(); + verifyExecutionContext(defaultContext, servicePluginsDescriptor, "DAGDefault"); if (defaultContext != null) { DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto( defaultContext); @@ -834,6 +838,7 @@ public class DAG { // Vertex ExecutionContext setup VertexExecutionContext execContext = vertex.getVertexExecutionContext(); + verifyExecutionContext(execContext, servicePluginsDescriptor, vertex.getName()); if (execContext != null) { DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto(execContext); @@ -986,4 +991,71 @@ public class DAG { return dagBuilder.build(); } + + private void verifyExecutionContext(VertexExecutionContext executionContext, + ServicePluginsDescriptor servicePluginsDescriptor, + String context) { + if (executionContext != null) { + if (executionContext.shouldExecuteInContainers()) { + if (servicePluginsDescriptor == null || !servicePluginsDescriptor.areContainersEnabled()) { + throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + + " specifies container execution but this is disabled in the ServicePluginDescriptor"); + } + } + if (executionContext.shouldExecuteInAm()) { + if (servicePluginsDescriptor == null || !servicePluginsDescriptor.isUberEnabled()) { + throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + + " specifies AM execution but this is disabled in the ServicePluginDescriptor"); + } + } + if (executionContext.getTaskSchedulerName() != null) { + boolean found = false; + if (servicePluginsDescriptor != null) { + found = checkNamedEntityExists(executionContext.getTaskSchedulerName(), + servicePluginsDescriptor.getTaskSchedulerDescriptors()); + } + if (!found) { + throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + + " specifies task scheduler as " + executionContext.getTaskSchedulerName() + + " which is not part of the ServicePluginDescriptor"); + } + } + if (executionContext.getContainerLauncherName() != null) { + boolean found = false; + if (servicePluginsDescriptor != null) { + found = checkNamedEntityExists(executionContext.getContainerLauncherName(), + servicePluginsDescriptor.getContainerLauncherDescriptors()); + } + if (!found) { + throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + + " specifies container launcher as " + executionContext.getContainerLauncherName() + + " which is not part of the ServicePluginDescriptor"); + } + } + if (executionContext.getTaskCommName() != null) { + boolean found = false; + if (servicePluginsDescriptor != null) { + found = checkNamedEntityExists(executionContext.getTaskCommName(), + servicePluginsDescriptor.getTaskCommunicatorDescriptors()); + } + if (!found) { + throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + + " specifies task communicator as " + executionContext.getTaskCommName() + + " which is not part of the ServicePluginDescriptor"); + } + } + } + } + + private boolean checkNamedEntityExists(String expected, NamedEntityDescriptor[] namedEntities) { + if (namedEntities == null) { + return false; + } + for (NamedEntityDescriptor named : namedEntities) { + if (named.getEntityName().equals(expected)) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 61e4d33..2823a86 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -801,7 +801,7 @@ public class DagTypeConverters { return builder.build(); } - public static AMPluginDescriptorProto convertServicePluginDescriptoToProto( + public static AMPluginDescriptorProto convertServicePluginDescriptorToProto( ServicePluginsDescriptor servicePluginsDescriptor) { AMPluginDescriptorProto.Builder pluginDescriptorBuilder = AMPluginDescriptorProto.newBuilder(); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java index 260b681..98806fa 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java @@ -42,8 +42,8 @@ public class ContainerLauncherOperationBase { } /** - * Get the node on whcih this container is to be launched - * @return + * Get the node on which this container is to be launched + * @return the node id for the container */ public NodeId getNodeId() { return nodeId; @@ -51,7 +51,7 @@ public class ContainerLauncherOperationBase { /** * Get the containerId for the container - * @return + * @return the container id for the container opeartion */ public ContainerId getContainerId() { return containerId; @@ -59,7 +59,7 @@ public class ContainerLauncherOperationBase { /** * Get the security token for the container. Primarily for YARN - * @return + * @return the token for the container launch. */ public Token getContainerToken() { return containerToken; http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java index ce35350..113b7db 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java @@ -14,6 +14,8 @@ package org.apache.tez.serviceplugins.api; +import java.util.Arrays; + import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -138,4 +140,15 @@ public class ServicePluginsDescriptor { public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() { return taskCommunicatorDescriptors; } + + @Override + public String toString() { + return "ServicePluginsDescriptor{" + + "enableContainers=" + enableContainers + + ", enableUber=" + enableUber + + ", taskSchedulerDescriptors=" + Arrays.toString(taskSchedulerDescriptors) + + ", containerLauncherDescriptors=" + Arrays.toString(containerLauncherDescriptors) + + ", taskCommunicatorDescriptors=" + Arrays.toString(taskCommunicatorDescriptors) + + '}'; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java index 4255c28..bff36cd 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java @@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability; public enum TaskAttemptEndReason { NODE_FAILED, // Completed because the node running the container was marked as dead COMMUNICATION_ERROR, // Communication error with the task - SERVICE_BUSY, // External service busy + EXECUTOR_BUSY, // External service busy INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision EXTERNAL_PREEMPTION, // Preempted due to cluster contention APPLICATION_ERROR, // An error in the AM caused by user code http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 3fe17df..268267b 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -86,7 +86,7 @@ public class TestDAG { dummyTaskCount, dummyTaskResource); DAG dag = DAG.create("testDAG"); - dag.createVertexGroup("group_1", v1,v2); + dag.createVertexGroup("group_1", v1, v2); try { dag.createVertexGroup("group_1", v2, v3); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index cd42109..7edea2f 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -38,7 +38,6 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; -import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; @@ -46,6 +45,10 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -317,6 +320,108 @@ public class TestDAGPlan { } @Test(timeout = 5000) + public void testInvalidExecContext_1() { + DAG dag = DAG.create("dag1"); + dag.setExecutionContext(VertexExecutionContext.createExecuteInAm(true)); + Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1); + dag.addVertex(v1); + + try { + dag.createDag(new TezConfiguration(false), null, null, null, true, null, null); + fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("AM execution")); + } + + dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true)); + + try { + dag.createDag(new TezConfiguration(false), null, null, null, true, null, null); + fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("container execution")); + } + + } + + @Test(timeout = 5000) + public void testInvalidExecContext_2() { + + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor + .create(false, + new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create("plugin", null)}, + new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)}, + new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)}); + + VertexExecutionContext validExecContext = VertexExecutionContext.create("plugin", "plugin", + "plugin"); + VertexExecutionContext invalidExecContext1 = + VertexExecutionContext.create("invalidplugin", "plugin", "plugin"); + VertexExecutionContext invalidExecContext2 = + VertexExecutionContext.create("plugin", "invalidplugin", "plugin"); + VertexExecutionContext invalidExecContext3 = + VertexExecutionContext.create("plugin", "plugin", "invalidplugin"); + + + DAG dag = DAG.create("dag1"); + dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true)); + Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1); + dag.addVertex(v1); + + // Should succeed. Default context is containers. + dag.createDag(new TezConfiguration(false), null, null, null, true, null, + servicePluginsDescriptor); + + + // Set execute in AM should fail + v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true)); + try { + dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor); + fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("AM execution")); + } + + // Valid context + v1.setExecutionContext(validExecContext); + dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor); + + // Invalid task scheduler + v1.setExecutionContext(invalidExecContext1); + try { + dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor); + fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("testvertex")); + assertTrue(e.getMessage().contains("task scheduler")); + assertTrue(e.getMessage().contains("invalidplugin")); + } + + // Invalid ContainerLauncher + v1.setExecutionContext(invalidExecContext2); + try { + dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor); + fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("testvertex")); + assertTrue(e.getMessage().contains("container launcher")); + assertTrue(e.getMessage().contains("invalidplugin")); + } + + // Invalid task comm + v1.setExecutionContext(invalidExecContext3); + try { + dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor); + fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("testvertex")); + assertTrue(e.getMessage().contains("task communicator")); + assertTrue(e.getMessage().contains("invalidplugin")); + } + + } + + @Test(timeout = 5000) public void testServiceDescriptorPropagation() { DAG dag = DAG.create("testDag"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1"). @@ -328,6 +433,10 @@ public class TestDAGPlan { VertexExecutionContext.create("plugin", "plugin", "plugin"); VertexExecutionContext v1Context = VertexExecutionContext.createExecuteInAm(true); + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor + .create(true, new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create("plugin", null)}, + new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)}, + new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)}); Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context); Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); @@ -347,7 +456,7 @@ public class TestDAGPlan { dag.addVertex(v1).addVertex(v2).addEdge(edge); dag.setExecutionContext(defaultExecutionContext); - DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true); + DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, null, servicePluginsDescriptor); assertEquals(2, dagProto.getVertexCount()); assertEquals(1, dagProto.getEdgeCount()); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java index e37f849..6f795fc 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java @@ -33,16 +33,13 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; -import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto; -import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; -import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.junit.Assert; import org.junit.Test; @@ -152,7 +149,7 @@ public class TestDagTypeConverters { // Uber-execution servicePluginsDescriptor = ServicePluginsDescriptor.create(true); - proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); assertTrue(proto.hasUberEnabled()); assertTrue(proto.hasContainersEnabled()); assertTrue(proto.getUberEnabled()); @@ -168,7 +165,7 @@ public class TestDagTypeConverters { servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers, taskComms); - proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); assertTrue(proto.hasUberEnabled()); assertTrue(proto.hasContainersEnabled()); assertFalse(proto.getUberEnabled()); @@ -185,7 +182,7 @@ public class TestDagTypeConverters { servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers, taskComms); - proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); assertTrue(proto.hasUberEnabled()); assertTrue(proto.hasContainersEnabled()); assertFalse(proto.getUberEnabled()); @@ -201,7 +198,7 @@ public class TestDagTypeConverters { servicePluginsDescriptor = ServicePluginsDescriptor.create(false, true, taskSchedulers, containerLaunchers, taskComms); - proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); assertTrue(proto.hasUberEnabled()); assertTrue(proto.hasContainersEnabled()); assertTrue(proto.getUberEnabled()); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 1fb7ff9..d6ef901 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -45,7 +45,6 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; -import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,7 +270,7 @@ public class TezUtilsInternal { switch (taskAttemptEndReason) { case COMMUNICATION_ERROR: return TaskAttemptTerminationCause.COMMUNICATION_ERROR; - case SERVICE_BUSY: + case EXECUTOR_BUSY: return TaskAttemptTerminationCause.SERVICE_BUSY; case INTERNAL_PREEMPTION: return TaskAttemptTerminationCause.INTERNAL_PREEMPTION; @@ -301,7 +300,7 @@ public class TezUtilsInternal { case COMMUNICATION_ERROR: return TaskAttemptEndReason.COMMUNICATION_ERROR; case SERVICE_BUSY: - return TaskAttemptEndReason.SERVICE_BUSY; + return TaskAttemptEndReason.EXECUTOR_BUSY; case INTERNAL_PREEMPTION: return TaskAttemptEndReason.INTERNAL_PREEMPTION; case EXTERNAL_PREEMPTION: http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 4fc541c..f1f683b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.api; +import javax.annotation.Nullable; import java.net.InetSocketAddress; import java.util.Map; @@ -115,8 +116,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * * @param containerId the associated containerId * @param endReason the end reason for the container completing + * @param diagnostics diagnostics associated with the container end */ - public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason); + public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, + @Nullable String diagnostics); /** * Register a task attempt to execute on a container @@ -138,14 +141,15 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { /** * Register the completion of a task. This may be a result of preemption, the container dying, - * the - * node dying, the task completing to success + * the node dying, the task completing to success * * @param taskAttemptID the task attempt which has completed / needs to be completed * @param endReason the endReason for the task attempt. + * @param diagnostics diagnostics associated with the task end */ public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, - TaskAttemptEndReason endReason); + TaskAttemptEndReason endReason, + @Nullable String diagnostics); /** * Return the address, if any, that the service listens on http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 0a684e7..e81ba2b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -160,7 +160,7 @@ public interface TaskCommunicatorContext { * * @return the name of the currently executing dag */ - String getCurretnDagName(); + String getCurrentDagName(); /** * Get the name of the Input vertices for the specified vertex. http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java index 2eec2fb..761bdb0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java @@ -34,9 +34,9 @@ public interface TaskAttemptListener { void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId); - void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason); + void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics); - void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason); + void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics); void dagComplete(DAG dag); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index ad6f2c4..2f6e93c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -176,7 +176,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements try { Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class); - ctor.setAccessible(true); return ctor.newInstance(taskCommunicatorContext); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new TezUncheckedException(e); @@ -366,7 +365,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override - public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason) { + public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) { if (LOG.isDebugEnabled()) { LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); } @@ -374,7 +373,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements if (containerInfo.taskAttemptId != null) { registeredAttempts.remove(containerInfo.taskAttemptId); } - taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason); + taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics); } @Override @@ -408,7 +407,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override - public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason) { + public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) { ContainerId containerId = registeredAttempts.remove(attemptId); if (containerId == null) { LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); @@ -422,7 +421,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. registeredContainers.put(containerId, NULL_CONTAINER_INFO); - taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason); + taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 0f10305..c56311c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TaskHeartbeatRequest; @@ -96,7 +97,13 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public boolean isKnownContainer(ContainerId containerId) { - return context.getAllContainers().get(containerId) != null; + AMContainer amContainer = context.getAllContainers().get(containerId); + if (amContainer == null || + amContainer.getTaskCommunicatorIdentifier() != taskCommunicatorIndex) { + return false; + } else { + return true; + } } @Override @@ -106,7 +113,9 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public void containerAlive(ContainerId containerId) { - taskAttemptListener.containerAlive(containerId); + if (isKnownContainer(containerId)) { + taskAttemptListener.containerAlive(containerId); + } } @Override @@ -136,7 +145,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver } @Override - public String getCurretnDagName() { + public String getCurrentDagName() { return getDag().getName(); } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index d3f1c44..9ecee5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -36,8 +36,12 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.common.*; import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezConverterUtils; +import org.apache.tez.common.TezLocalResource; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; @@ -199,7 +203,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } @Override - public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) { ContainerInfo containerInfo = registeredContainers.remove(containerId); if (containerInfo != null) { synchronized(containerInfo) { @@ -245,7 +249,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { @Override - public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) { + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) { TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID); ContainerId containerId = attemptToContainerMap.remove(taskAttempt); if(containerId == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 17f5675..6b474ff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -82,7 +82,6 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; @@ -180,6 +179,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final AppContext appContext; private final UserGroupInformation dagUGI; private final ACLManager aclManager; + private final org.apache.tez.dag.api.Vertex.VertexExecutionContext defaultExecutionContext; @VisibleForTesting StateChangeNotifier entityUpdateTracker; @@ -538,6 +538,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // this is only for recovery in case it does not call the init transition this.startDAGCpuTime = appContext.getCumulativeCPUTime(); this.startDAGGCTime = appContext.getCumulativeGCTime(); + if (jobPlan.hasDefaultExecutionContext()) { + defaultExecutionContext = DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext()); + } else { + defaultExecutionContext = null; + } this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf); // This "this leak" is okay because the retained pointer is in an @@ -718,11 +723,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, @Override public org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext() { - if (jobPlan.hasDefaultExecutionContext()) { - return DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext()); - } else { - return null; - } + return defaultExecutionContext; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 65ea3fb..c6d8a7e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -227,7 +227,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedBeforeRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.START_WAIT, - TaskAttemptStateInternal.KILL_IN_PROGRESS, + TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new TerminatedBeforeRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.START_WAIT, @@ -267,7 +267,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.KILL_IN_PROGRESS, + TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new TerminatedWhileRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, @@ -1095,7 +1095,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Compute node/rack location request even if re-scheduled. Set<String> racks = new HashSet<String>(); - // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define localicty for different attempts. + // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define locality for different attempts. TaskLocationHint locationHint = ta.getTaskLocationHint(); if (locationHint != null) { if (locationHint.getRacks() != null) { @@ -1266,6 +1266,7 @@ public class TaskAttemptImpl implements TaskAttempt, if (sendSchedulerEvent()) { ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), + ta instanceof DiagnosableEvent ? ((DiagnosableEvent)ta).getDiagnosticInfo() : null, ta.getVertex().getTaskSchedulerIdentifier())); } } @@ -1348,7 +1349,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the Scheduler. ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, - TaskAttemptState.SUCCEEDED, null, ta.getVertex().getTaskSchedulerIdentifier())); + TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index d0cee21..b56bd5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -70,7 +70,7 @@ public class ContainerLauncherRouter extends AbstractService TaskAttemptListener taskAttemptListener, String workingDirectory, List<NamedEntityDescriptor> containerLauncherDescriptors, - boolean isPureLocalMode) throws UnknownHostException { + boolean isPureLocalMode) { super(ContainerLauncherRouter.class.getName()); this.appContext = context; @@ -101,8 +101,7 @@ public class ContainerLauncherRouter extends AbstractService TaskAttemptListener taskAttemptListener, String workingDirectory, int containerLauncherIndex, - boolean isPureLocalMode) throws - UnknownHostException { + boolean isPureLocalMode) { if (containerLauncherDescriptor.getEntityName().equals( TezConstants.getTezYarnServicePluginName())) { return createYarnContainerLauncher(containerLauncherContext); @@ -126,15 +125,18 @@ public class ContainerLauncherRouter extends AbstractService AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, - boolean isPureLocalMode) throws - UnknownHostException { + boolean isPureLocalMode) { LOG.info("Creating LocalContainerLauncher"); // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of // extensive internals which are only available at runtime. Will likely require // some kind of runtime binding of parameters in the payload to work correctly. - return - new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, - workingDirectory, isPureLocalMode); + try { + return + new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, + workingDirectory, isPureLocalMode); + } catch (UnknownHostException e) { + throw new TezUncheckedException(e); + } } @VisibleForTesting @@ -149,7 +151,6 @@ public class ContainerLauncherRouter extends AbstractService try { Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz .getConstructor(ContainerLauncherContext.class); - ctor.setAccessible(true); return ctor.newInstance(containerLauncherContext); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new TezUncheckedException(e); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java index 33763e7..ccc5465 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java @@ -29,15 +29,17 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { private final ContainerId containerId; private final TaskAttemptState state; private final TaskAttemptEndReason taskAttemptEndReason; + private final String diagnostics; private final int schedulerId; public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId, - TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, int schedulerId) { + TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, String diagnostics, int schedulerId) { super(AMSchedulerEventType.S_TA_ENDED); this.attempt = attempt; this.containerId = containerId; this.state = state; this.taskAttemptEndReason = taskAttemptEndReason; + this.diagnostics = diagnostics; this.schedulerId = schedulerId; } @@ -64,4 +66,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { public TaskAttemptEndReason getTaskAttemptEndReason() { return taskAttemptEndReason; } + + public String getDiagnostics() { + return diagnostics; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index 395589c..668c759 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -131,7 +131,7 @@ public class LocalTaskSchedulerService extends TaskScheduler { } @Override - public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) { + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) { return taskRequestHandler.addDeallocateTaskRequest(task); } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 0f19379..a127ddf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -286,7 +286,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements TaskAttempt attempt = event.getAttempt(); // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation. boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()] - .deallocateTask(attempt, false, event.getTaskAttemptEndReason()); + .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics()); // use stored value of container id in case the scheduler has removed this // assignment because the task has been deallocated earlier. // retroactive case @@ -331,7 +331,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements } boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, - true, null); + true, null, event.getDiagnostics()); if (!wasContainerAllocated) { LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task"); @@ -436,7 +436,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements try { Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz .getConstructor(TaskSchedulerContext.class); - ctor.setAccessible(true); return ctor.newInstance(taskSchedulerContext); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new TezUncheckedException(e); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index b4d1f26..2a5f937 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -989,7 +989,8 @@ public class YarnTaskSchedulerService extends TaskScheduler */ @Override public boolean deallocateTask(Object task, boolean taskSucceeded, - TaskAttemptEndReason endReason) { + TaskAttemptEndReason endReason, + String diagnostics) { Map<CookieContainerRequest, Container> assignedContainers = null; synchronized (this) { @@ -1207,7 +1208,7 @@ public class YarnTaskSchedulerService extends TaskScheduler CookieContainerRequest request = entry.getValue(); if (request.getPriority().equals(lowestPriNewContainer.getPriority())) { LOG.info("Resending request for task again: " + task); - deallocateTask(task, true, null); + deallocateTask(task, true, null, null); allocateTask(task, request.getCapability(), (request.getNodes() == null ? null : request.getNodes().toArray(new String[request.getNodes().size()])), http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index aeacf84..99cec2b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -631,14 +631,14 @@ public class AMContainerImpl implements AMContainer { SingleArcTransition<AMContainerImpl, AMContainerEvent> { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { + AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent; if (container.currentAttempt != null) { - AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent; // for a properly setup cluster this should almost always be an app error // need to differentiate between launch failed due to framework/cluster or app container.sendTerminatingToTaskAttempt(container.currentAttempt, event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED); } - container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED); + container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED, event.getMessage()); container.deAllocate(); } } @@ -668,7 +668,7 @@ public class AMContainerImpl implements AMContainer { } container.containerLocalResources = null; container.additionalLocalResources = null; - container.unregisterFromTAListener(event.getContainerEndReason()); + container.unregisterFromTAListener(event.getContainerEndReason(), event.getDiagnostics()); String diag = event.getDiagnostics(); if (!(diag == null || diag.equals(""))) { LOG.info("Container " + container.getContainerId() @@ -694,7 +694,7 @@ public class AMContainerImpl implements AMContainer { container.sendTerminatingToTaskAttempt(container.currentAttempt, getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED); } - container.unregisterFromTAListener(ContainerEndReason.OTHER); + container.unregisterFromTAListener(ContainerEndReason.OTHER, getMessage(container, cEvent)); container.logStopped(container.currentAttempt == null ? ContainerExitStatus.SUCCESS : ContainerExitStatus.INVALID); @@ -746,7 +746,11 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED); + String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. "; + if (cEvent instanceof DiagnosableEvent) { + errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo(); + } + container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED, errorMessage); container.deAllocate(); } } @@ -756,14 +760,15 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); + String errorMessage = "Container " + container.getContainerId() + + " hit an invalid transition - " + cEvent.getType() + " at " + + container.getState(); if (container.currentAttempt != null) { container.sendTerminatingToTaskAttempt(container.currentAttempt, - "Container " + container.getContainerId() + - " hit an invalid transition - " + cEvent.getType() + " at " + - container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR); + errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR); } container.logStopped(ContainerExitStatus.ABORTED); - container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR); + container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage); container.sendStopRequestToNM(); } } @@ -835,7 +840,12 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; - container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR); + String errorMessage = "AMScheduler Error: Multiple simultaneous " + + "taskAttempt allocations to: " + container.getContainerId() + + ". Attempts: " + container.getCurrentTaskAttempt() + ", " + event.getTaskAttemptId() + + ". Current state: " + container.getState(); + container.unregisterAttemptFromListener(container.currentAttempt, + TaskAttemptEndReason.FRAMEWORK_ERROR, errorMessage); container.handleExtraTAAssign(event, container.currentAttempt); } } @@ -846,7 +856,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { container.lastTaskFinishTime = System.currentTimeMillis(); container.completedAttempts.add(container.currentAttempt); - container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER, null); container.currentAttempt = null; } } @@ -863,7 +873,9 @@ public class AMContainerImpl implements AMContainer { container.sendTerminatedToTaskAttempt(container.currentAttempt, getMessage(container, event), event.getTerminationCause()); } - container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause())); + container.unregisterAttemptFromListener(container.currentAttempt, + TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()), + getMessage(container, event)); container.registerFailedAttempt(container.currentAttempt); container.currentAttempt= null; super.transition(container, cEvent); @@ -873,7 +885,8 @@ public class AMContainerImpl implements AMContainer { protected static class StopRequestAtRunningTransition extends StopRequestAtIdleTransition { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { - container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER, + getMessage(container, cEvent)); super.transition(container, cEvent); } } @@ -894,7 +907,8 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED); + String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. "; + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED, errorMessage); } } @@ -903,11 +917,13 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR); + String errorMessage = "Container " + container.getContainerId() + + " hit an invalid transition - " + cEvent.getType() + " at " + + container.getState(); + container.unregisterAttemptFromListener(container.currentAttempt, + TaskAttemptEndReason.FRAMEWORK_ERROR, errorMessage); container.sendTerminatingToTaskAttempt(container.currentAttempt, - "Container " + container.getContainerId() + - " hit an invalid transition - " + cEvent.getType() + " at " + - container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR); + errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR); } } @@ -1029,7 +1045,7 @@ public class AMContainerImpl implements AMContainer { LOG.warn(errorMessage); this.logStopped(ContainerExitStatus.INVALID); this.sendStopRequestToNM(); - this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR); + this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage); this.unregisterFromContainerListener(); } @@ -1087,8 +1103,8 @@ public class AMContainerImpl implements AMContainer { container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId)); } - protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) { - taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason); + protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason, String diagnostics) { + taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics); } protected void registerAttemptWithListener(AMContainerTask amContainerTask) { @@ -1099,8 +1115,8 @@ public class AMContainerImpl implements AMContainer { taskAttemptListener.registerRunningContainer(containerId, taskCommId); } - protected void unregisterFromTAListener(ContainerEndReason endReason) { - this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason); + protected void unregisterFromTAListener(ContainerEndReason endReason, String diagnostics) { + this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics); } protected void registerWithContainerListener() { http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index 0668ff2..32e515b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -53,7 +53,7 @@ public class AMNodeTracker extends AbstractService implements @SuppressWarnings("rawtypes") public AMNodeTracker(EventHandler eventHandler, AppContext appContext) { - super("AMNodeMap"); + super("AMNodeTracker"); this.perSourceNodeTrackers = new ConcurrentHashMap<>(); this.eventHandler = eventHandler; this.appContext = appContext; http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 4d404b9..5159aff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -184,12 +184,12 @@ public class TestTaskAttemptListenerImplTezDag { assertEquals(taskSpec, containerTask.getTaskSpec()); // Task unregistered. Should respond to heartbeats - taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER); + taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); containerTask = tezUmbilical.getTask(containerContext2); assertNull(containerTask); // Container unregistered. Should send a shouldDie = true - taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER); + taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null); containerTask = tezUmbilical.getTask(containerContext2); assertTrue(containerTask.shouldDie()); @@ -203,7 +203,7 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0); taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0); - taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER); + taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null); containerTask = tezUmbilical.getTask(containerContext3); assertTrue(containerTask.shouldDie()); } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java index abb5e42..74468f2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java @@ -41,7 +41,6 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; @@ -110,7 +109,7 @@ public class TestTaskAttemptListenerImplTezDag2 { taskAttemptListener .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); taskAttemptListener - .taskKilled(taskAttemptId2, TaskAttemptEndReason.SERVICE_BUSY, "Diagnostics2"); + .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2"); ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(2)).handle(argumentCaptor.capture()); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java new file mode 100644 index 0000000..1545eb4 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.junit.Test; + +public class TestTaskCommunicatorContextImpl { + + @Test(timeout = 5000) + public void testIsKnownContainer() { + AppContext appContext = mock(AppContext.class); + TaskAttemptListenerImpTezDag tal = mock(TaskAttemptListenerImpTezDag.class); + + AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), tal, mock( + ContainerSignatureMatcher.class), appContext); + + doReturn(amContainerMap).when(appContext).getAllContainers(); + + ContainerId containerId01 = mock(ContainerId.class); + Container container01 = mock(Container.class); + doReturn(containerId01).when(container01).getId(); + + ContainerId containerId11 = mock(ContainerId.class); + Container container11 = mock(Container.class); + doReturn(containerId11).when(container11).getId(); + + amContainerMap.addContainerIfNew(container01, 0, 0, 0); + amContainerMap.addContainerIfNew(container11, 1, 1, 1); + + TaskCommunicatorContext taskCommContext0 = new TaskCommunicatorContextImpl(appContext, tal, null, 0); + TaskCommunicatorContext taskCommContext1 = new TaskCommunicatorContextImpl(appContext, tal, null, 1); + + assertTrue(taskCommContext0.isKnownContainer(containerId01)); + assertFalse(taskCommContext0.isKnownContainer(containerId11)); + + assertFalse(taskCommContext1.isKnownContainer(containerId01)); + assertTrue(taskCommContext1.isKnownContainer(containerId11)); + + taskCommContext0.containerAlive(containerId01); + verify(tal).containerAlive(containerId01); + reset(tal); + + taskCommContext0.containerAlive(containerId11); + verify(tal, never()).containerAlive(containerId11); + reset(tal); + + taskCommContext1.containerAlive(containerId01); + verify(tal, never()).containerAlive(containerId01); + reset(tal); + + taskCommContext1.containerAlive(containerId11); + verify(tal).containerAlive(containerId11); + reset(tal); + + taskCommContext1.containerAlive(containerId01); + verify(tal, never()).containerAlive(containerId01); + reset(tal); + + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java index c76aa50..4f68fab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java @@ -328,7 +328,7 @@ public class TestTaskCommunicatorManager { } @Override - public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) { } @@ -342,7 +342,7 @@ public class TestTaskCommunicatorManager { @Override public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, - TaskAttemptEndReason endReason) { + TaskAttemptEndReason endReason, String diagnostics) { } http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 2bf1c85..947ea93 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -308,6 +308,7 @@ public class TestTaskAttempt { resource, createFakeContainerContext(), false); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -351,6 +352,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -453,6 +455,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -519,6 +522,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -613,6 +617,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -745,6 +750,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -837,6 +843,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -933,6 +940,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -1037,6 +1045,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -1138,6 +1147,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -1342,7 +1352,7 @@ public class TestTaskAttempt { } } } - }; + } private class MockTaskAttemptImpl extends TaskAttemptImpl { http://git-wip-us.apache.org/repos/asf/tez/blob/2ecffa71/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java index 62a5f19..d0caf8c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java @@ -273,8 +273,7 @@ public class TestContainerLauncherRouter { TaskAttemptListener taskAttemptListener, String workingDirectory, int containerLauncherIndex, - boolean isPureLocalMode) throws - UnknownHostException { + boolean isPureLocalMode) { numContainerLaunchers.incrementAndGet(); boolean added = containerLauncherIndices.add(containerLauncherIndex); assertTrue("Cannot add multiple launchers with the same index", added); @@ -298,8 +297,7 @@ public class TestContainerLauncherRouter { AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, - boolean isPureLocalMode) throws - UnknownHostException { + boolean isPureLocalMode) { uberContainerLauncherCreated.set(true); testContainerLaunchers.add(uberContainerlauncher); return uberContainerlauncher;
