Repository: tez Updated Branches: refs/heads/master 81dacf58e -> abb350c0c
TEZ-3574. Container reuse won't pickup extra dag level local resource. Contributed by Zhiyuan Yang. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/abb350c0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/abb350c0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/abb350c0 Branch: refs/heads/master Commit: abb350c0c1b397fe05e0de403d706e10044af398 Parents: 81dacf5 Author: Siddharth Seth <[email protected]> Authored: Sun Jan 15 20:15:38 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Sun Jan 15 20:15:38 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../main/java/org/apache/tez/dag/api/DAG.java | 15 +++++ .../org/apache/tez/dag/api/TestDAGVerify.java | 25 ++++++++ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 + .../app/rm/container/AMContainerHelpers.java | 16 ++--- .../dag/app/rm/container/AMContainerImpl.java | 4 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 65 ++++++++++++++++++++ 7 files changed, 113 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f275350..cfc5214 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3574. Container reuse won't pickup extra dag level local resource. TEZ-3443. Remove a repeated/unused method from MRTask. TEZ-3551. FrameworkClient created twice causing minor delay. TEZ-3566. Avoid caching fs isntances in TokenCache after a point. @@ -168,6 +169,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3574. Container reuse won't pickup extra dag level local resource. TEZ-3566. Avoid caching fs isntances in TokenCache after a point. TEZ-3568. Update SecurityUtils configuration to pick user provided configuration. TEZ-3559. TEZ_LIB_URIS doesn't work with schemes different than the defaultFS http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index f15c1fb..c136811 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -685,6 +685,21 @@ public class DAG { } } + // check for conflicts between dag level local resource and vertex level local resource + for (Vertex v : vertices.values()) { + for (Map.Entry<String, LocalResource> localResource : v.getTaskLocalFiles().entrySet()) { + String resourceName = localResource.getKey(); + LocalResource resource = localResource.getValue(); + if (commonTaskLocalFiles.containsKey(resourceName) + && !commonTaskLocalFiles.get(resourceName).equals(resource)) { + throw new IllegalStateException("There is conflicting local resource (" + resourceName + + ") between dag local resource and vertex " + v.getName() + " local resource. " + + "\nResource of dag : " + commonTaskLocalFiles.get(resourceName) + + "\nResource of vertex: " + resource); + } + } + } + return topologicalVertexStack; } http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index 794a597..5706542 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.api; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -1166,4 +1167,28 @@ public class TestDAGVerify { dag.verify(); } + @Test(timeout = 5000) + public void testDAGWithConflictingResource() { + DAG dag = DAG.create("dag"); + Map<String, LocalResource> localResourceMap = new HashMap<>(); + String commonResourceKey = "local resource"; + localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION, 0, 0)); + dag.addTaskLocalFiles(localResourceMap); + + Vertex v1 = Vertex.create("v", ProcessorDescriptor.create(dummyProcessorClassName), 1); + // same key but different resource + localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION, 10, 0)); + v1.addTaskLocalFiles(localResourceMap); + + dag.addVertex(v1); + + try { + dag.verify(); + Assert.fail("should report failure on conflict resources"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("There is conflicting local resource")); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index bf291b7..4cda98d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -906,6 +906,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl this.localResources = DagTypeConverters .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig() .getLocalResourceList()); + this.localResources.putAll(dag.getLocalResources()); this.environment = DagTypeConverters .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig() .getEnvironmentSettingList()); http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 11b5006..3bac7b5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -94,7 +94,7 @@ public class AMContainerHelpers { */ private static ContainerLaunchContext createCommonContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, - Credentials credentials, Map<String, LocalResource> localResources) { + Credentials credentials) { // Application environment Map<String, String> environment = new HashMap<String, String>(); @@ -137,7 +137,7 @@ public class AMContainerHelpers { // The null fields are per-container and will be constructed for each // container separately. ContainerLaunchContext container = - ContainerLaunchContext.newInstance(localResources, environment, null, + ContainerLaunchContext.newInstance(null, environment, null, serviceData, containerCredentialsBuffer, applicationACLs); return container; } @@ -145,7 +145,6 @@ public class AMContainerHelpers { @VisibleForTesting public static ContainerLaunchContext createContainerLaunchContext( TezDAGID tezDAGID, - Map<String, LocalResource> commonDAGLRs, Map<ApplicationAccessType, String> acls, ContainerId containerId, Map<String, LocalResource> localResources, @@ -159,7 +158,7 @@ public class AMContainerHelpers { synchronized (commonContainerSpecLock) { if (!commonContainerSpecs.containsKey(tezDAGID)) { commonContainerSpec = - createCommonContainerLaunchContext(acls, credentials, commonDAGLRs); + createCommonContainerLaunchContext(acls, credentials); commonContainerSpecs.put(tezDAGID, commonContainerSpec); } else { commonContainerSpec = commonContainerSpecs.get(tezDAGID); @@ -175,13 +174,6 @@ public class AMContainerHelpers { } } - // Fill in the fields needed per-container that are missing in the common - // spec. - Map<String, LocalResource> lResources = - new TreeMap<String, LocalResource>(); - lResources.putAll(commonContainerSpec.getLocalResources()); - lResources.putAll(localResources); - // Setup environment by cloning from common env. Map<String, String> env = commonContainerSpec.getEnvironment(); Map<String, String> myEnv = new HashMap<String, String>(env.size()); @@ -214,7 +206,7 @@ public class AMContainerHelpers { // Construct the actual Container ContainerLaunchContext container = - ContainerLaunchContext.newInstance(lResources, myEnv, commands, + ContainerLaunchContext.newInstance(localResources, myEnv, commands, myServiceData, commonContainerSpec.getTokens().duplicate(), acls); return container; http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 94c8fe0..5d73a7b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -460,10 +460,8 @@ public class AMContainerImpl implements AMContainer { container.credentialsChanged = true; TezDAGID dagId = null; - Map<String, LocalResource> dagLocalResources = null; if (container.appContext.getCurrentDAG() != null) { dagId = container.appContext.getCurrentDAG().getID(); - dagLocalResources = container.appContext.getCurrentDAG().getLocalResources(); } // TODO TEZ-2625 This should ideally be handled inside of user code. Will change once @@ -489,7 +487,7 @@ public class AMContainerImpl implements AMContainer { } ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext( - dagId, dagLocalResources, + dagId, container.appContext.getApplicationACLs(), container.getContainerId(), containerContext.getLocalResources(), http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index a11311d..90d675e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -51,14 +51,24 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.tez.common.DrainDispatcher; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.dag.app.rm.AMSchedulerEvent; +import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; +import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.VertexStatistics; @@ -254,6 +264,7 @@ public class TestVertexImpl { private TaskEventDispatcher taskEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private DagEventDispatcher dagEventDispatcher; + private AMSchedulerEventDispatcher amSchedulerEventDispatcher; private HistoryEventHandler historyEventHandler; private StateChangeNotifierForTest updateTracker; private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption; @@ -414,6 +425,14 @@ public class TestVertexImpl { } } + private class AMSchedulerEventDispatcher implements EventHandler<AMSchedulerEvent> { + List<AMSchedulerEvent> events = new ArrayList<>(); + + public void handle(AMSchedulerEvent event) { + events.add(event); + } + } + private class VertexEventDispatcher implements EventHandler<VertexEvent> { @@ -1619,6 +1638,16 @@ public class TestVertexImpl { LOG.info("Setting up dag plan"); DAGPlan dag = DAGPlan.newBuilder() .setName("testverteximpl") + .addLocalResource( + PlanLocalResource.newBuilder() + .setName("dag lr") + .setUri("dag ir uri") + .setSize(1) + .setTimeStamp(1) + .setType(PlanLocalResourceType.FILE) + .setVisibility(PlanLocalResourceVisibility.APPLICATION) + .build() + ) .addVertex( VertexPlan.newBuilder() .setName("vertex1") @@ -1636,6 +1665,16 @@ public class TestVertexImpl { .setMemoryMb(1024) .setJavaOpts("") .setTaskModule("x1.y1") + .addLocalResource( + PlanLocalResource.newBuilder() + .setName("vertex lr") + .setUri("vertex ir uri") + .setSize(1) + .setTimeStamp(1) + .setType(PlanLocalResourceType.FILE) + .setVisibility(PlanLocalResourceVisibility.APPLICATION) + .build() + ) .build() ) .addOutEdgeId("e1") @@ -2474,6 +2513,13 @@ public class TestVertexImpl { DAG dag = mock(DAG.class); doReturn(ugi).when(dag).getDagUGI(); doReturn(dagName).when(dag).getName(); + Map<String, LocalResource> localResources = new HashMap<>(); + for (PlanLocalResource planLR : dagPlan.getLocalResourceList()) { + localResources.put(planLR.getName(), + DagTypeConverters.convertPlanLocalResourceToLocalResource(planLR)); + } + when(dag.getLocalResources()).thenReturn(localResources); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID(); doReturn(dag).when(appContext).getCurrentDAG(); @@ -2559,6 +2605,8 @@ public class TestVertexImpl { dispatcher.register(VertexEventType.class, vertexEventDispatcher); dagEventDispatcher = new DagEventDispatcher(); dispatcher.register(DAGEventType.class, dagEventDispatcher); + amSchedulerEventDispatcher = new AMSchedulerEventDispatcher(); + dispatcher.register(AMSchedulerEventType.class, amSchedulerEventDispatcher); dispatcher.init(conf); dispatcher.start(); } @@ -7057,4 +7105,21 @@ public class TestVertexImpl { Assert.assertTrue(v.getLastTaskFinishTime() > 0); } + @Test(timeout = 5000) + public void testPickupDagLocalResourceOnScheduleTask() { + initAllVertices(VertexState.INITED); + VertexImpl v1 = vertices.get("vertex1"); + startVertex(v1); + + TezTaskAttemptID taskAttemptId0 = TezTaskAttemptID.getInstance(v1.getTask(0).getTaskId(), 0); + TaskAttemptImpl ta0 = (TaskAttemptImpl) v1.getTask(0).getAttempt(taskAttemptId0); + ta0.handle(new TaskAttemptEventSchedule(taskAttemptId0, 1, 1)); + + dispatcher.await(); + Assert.assertEquals(1, amSchedulerEventDispatcher.events.size()); + AMSchedulerEventTALaunchRequest launchRequestEvent = (AMSchedulerEventTALaunchRequest) amSchedulerEventDispatcher.events.get(0); + Map<String, LocalResource> localResourceMap = launchRequestEvent.getContainerContext().getLocalResources(); + Assert.assertTrue(localResourceMap.containsKey("dag lr")); + Assert.assertTrue(localResourceMap.containsKey("vertex lr")); + } }
