Repository: tez Updated Branches: refs/heads/branch-0.8 a83479b1c -> e42c9794b
TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). Contributed by Hitesh Shah. (cherry picked from commit 3b08cbf907784de463c9e3c05147b5c6d681251d) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e42c9794 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e42c9794 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e42c9794 Branch: refs/heads/branch-0.8 Commit: e42c9794bd082563773df3f34c2eebc8b356645a Parents: a83479b Author: Siddharth Seth <[email protected]> Authored: Thu Jun 30 16:30:01 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Jun 30 16:31:13 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 12 +-- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 3 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 14 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 22 +++-- .../tez/dag/app/dag/impl/TestVertexImpl2.java | 28 +++++- .../tez/dag/app/rm/TestContainerReuse.java | 2 +- .../org/apache/tez/util/ProtoConverters.java | 31 ++++++- .../src/test/proto/TezDaemonProtocol.proto | 1 + .../tez/mapreduce/output/TestMROutput.java | 2 +- .../tez/mapreduce/processor/MapUtils.java | 2 +- .../processor/reduce/TestReduceProcessor.java | 2 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 6 ++ .../apache/tez/runtime/api/impl/TaskSpec.java | 53 ++++++++++-- .../apache/tez/runtime/task/TezTaskRunner2.java | 13 ++- .../TestLogicalIOProcessorRuntimeTask.java | 2 +- .../tez/runtime/api/impl/TestTaskSpec.java | 91 ++++++++++++++++++++ .../tez/runtime/task/TestTaskExecution2.java | 2 +- .../tez/runtime/task/TestTezTaskRunner2.java | 65 ++++++++++++++ 20 files changed, 325 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0e50032..355013c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. TEZ-3314. Double counting input bytes in MultiMRInput. TEZ-3308. Add counters to capture input split length. http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 4118bb5..936c5db 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -807,7 +807,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty(type="boolean") public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX + "scale.memory.enabled"; @@ -819,7 +819,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX + "scale.memory.allocator.class"; @@ -833,7 +833,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty(type="double") public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX + "scale.memory.reserve-fraction"; @@ -846,7 +846,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty(type="float") public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO = TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io"; @@ -856,7 +856,7 @@ public class TezConfiguration extends Configuration { /** * Max cumulative total reservation for additional IOs. */ - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty(type="float") public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX = TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max"; @@ -867,7 +867,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) @ConfigurationProperty public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS = TEZ_TASK_PREFIX + "scale.memory.ratios"; http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index a6c6c02..fd6d446 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -198,7 +198,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private long cachedCountersTimestamp = 0; private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>(); + // Combined configs for the DAG private final Configuration dagConf; + // DAG specific configs only + // Useful when trying to serialize only the diff from global configs + private final Configuration dagOnlyConf; + private final DAGPlan jobPlan; private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false); @@ -499,6 +504,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.dagId = dagId; this.jobPlan = jobPlan; this.dagConf = new Configuration(amConf); + this.dagOnlyConf = new Configuration(false); Iterator<PlanKeyValuePair> iter = jobPlan.getDagConf().getConfKeyValuesList().iterator(); // override the amConf by using DAG level configuration @@ -506,6 +512,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, PlanKeyValuePair keyValPair = iter.next(); TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG); this.dagConf.set(keyValPair.getKey(), keyValPair.getValue()); + this.dagOnlyConf.set(keyValPair.getKey(), keyValPair.getValue()); } this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>"; this.userName = appUserName; @@ -1626,7 +1633,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, dag.eventHandler, dag.taskCommunicatorManagerInterface, dag.clock, dag.taskHeartbeatHandler, !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint, - dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker); + dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker, + dag.dagOnlyConf); return v; } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 28a1c5e..ec7db61 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -723,7 +723,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskSpec taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), - baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); + baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs(), + baseTaskSpec.getTaskConf()); return new TaskAttemptImpl(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(), http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 6b79e98..01bca8f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -236,7 +236,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private long cachedCountersTimestamp = 0; private Resource taskResource; + // Merged/combined vertex level config private Configuration vertexConf; + // Vertex specific configs only ( include the dag specific configs too ) + // Useful when trying to serialize only the diff from global configs + @VisibleForTesting + Configuration vertexOnlyConf; private final boolean isSpeculationEnabled; @@ -854,19 +859,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl TaskHeartbeatHandler thh, boolean commitVertexOutputs, AppContext appContext, VertexLocationHint vertexLocationHint, Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, - StateChangeNotifier entityStatusTracker) { + StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) { this.vertexId = vertexId; this.vertexPlan = vertexPlan; this.vertexName = StringInterner.weakIntern(vertexName); this.vertexConf = new Configuration(dagConf); - // override dag configuration by using vertex's specified configuration + this.vertexOnlyConf = new Configuration(dagOnlyConf); if (vertexPlan.hasVertexConf()) { ConfigurationProto confProto = vertexPlan.getVertexConf(); for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) { TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX); vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue()); + vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue()); } } + + this.clock = clock; this.appContext = appContext; this.commitVertexOutputs = commitVertexOutputs; @@ -1567,7 +1575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return TaskSpec.createBaseTaskSpec(getDAG().getName(), getName(), getTotalTasks(), getProcessorDescriptor(), getInputSpecList(taskIndex), getOutputSpecList(taskIndex), - getGroupInputSpecList(taskIndex)); + getGroupInputSpecList(taskIndex), vertexOnlyConf); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 6b30a24..d165272 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2261,6 +2261,8 @@ public class TestVertexImpl { LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt); vertices = new HashMap<String, VertexImpl>(); vertexIdMap = new HashMap<TezVertexID, VertexImpl>(); + Configuration dagConf = new Configuration(false); + dagConf.set("abc", "foobar"); for (int i = 0; i < vCnt; ++i) { VertexPlan vPlan = dagPlan.getVertex(i); String vName = vPlan.getName(); @@ -2272,17 +2274,18 @@ public class TestVertexImpl { if (customInitializer == null) { v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, - clock, thh, appContext, locationHint, dispatcher, updateTracker); + clock, thh, appContext, locationHint, dispatcher, updateTracker, dagConf); } else { v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, - clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker); + clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker, + dagConf); } } else { v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, dagConf); } vertices.put(vName, v); vertexIdMap.put(vertexId, v); @@ -3126,6 +3129,7 @@ public class TestVertexImpl { TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0); Assert.assertEquals(mockLocation, event.getTaskLocationHint()); Assert.assertNotNull(event.getBaseTaskSpec()); + Assert.assertEquals("foobar", event.getBaseTaskSpec().getTaskConf().get("abc")); } @Test(timeout = 5000) @@ -5422,7 +5426,7 @@ public class TestVertexImpl { VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, new Configuration(false)); v.setInputVertices(new HashMap()); vertexIdMap.put(vId, v); vertices.put(v.getName(), v); @@ -5457,11 +5461,12 @@ public class TestVertexImpl { VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, InputInitializer presetInitializer, - StateChangeNotifier updateTracker) { + StateChangeNotifier updateTracker, + Configuration dagConf) { super(vertexId, vertexPlan, vertexName, conf, eventHandler, taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, dagConf); this.presetInitializer = presetInitializer; } @@ -5496,11 +5501,12 @@ public class TestVertexImpl { AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, - StateChangeNotifier updateTracker) { + StateChangeNotifier updateTracker, + Configuration dagConf) { super(vertexId, vertexPlan, vertexName, conf, eventHandler, taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, dagConf); this.dispatcher = dispatcher; } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java index 8bd288a..b1976c3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java @@ -49,6 +49,8 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; @@ -60,6 +62,7 @@ import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; +import org.junit.Assert; import org.junit.Test; /** @@ -433,13 +436,18 @@ public class TestVertexImpl2 { ExecutionContextTestInfoHolder vertexInfo) { VertexPlan vertexPlan = createVertexPlanForExeuctionContextTests(vertexInfo); VertexWrapper vertexWrapper = - new VertexWrapper(vertexInfo.appContext, vertexPlan, new Configuration(false)); + new VertexWrapper(vertexInfo.appContext, vertexPlan, new Configuration(false), true); return vertexWrapper; } private VertexPlan createVertexPlanForExeuctionContextTests(ExecutionContextTestInfoHolder info) { + ConfigurationProto confProto = ConfigurationProto.newBuilder() + .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo").setValue("bar").build()) + .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo1").setValue("bar2").build()) + .build(); VertexPlan.Builder vertexPlanBuilder = VertexPlan.newBuilder() .setName(info.vertexName) + .setVertexConf(confProto) .setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder() .setNumTasks(10) .setJavaOpts("dontcare") @@ -502,7 +510,8 @@ public class TestVertexImpl2 { final VertexImpl vertex; final VertexPlan vertexPlan; - VertexWrapper(AppContext appContext, VertexPlan vertexPlan, Configuration conf) { + VertexWrapper(AppContext appContext, VertexPlan vertexPlan, Configuration conf, + boolean checkVertexOnlyConf) { if (appContext == null) { mockAppContext = createDefaultMockAppContext(); DAG mockDag = mock(DAG.class); @@ -512,6 +521,9 @@ public class TestVertexImpl2 { mockAppContext = appContext; } + Configuration dagConf = new Configuration(false); + dagConf.set("abc1", "xyz1"); + dagConf.set("foo1", "bar1"); this.vertexPlan = vertexPlan; @@ -520,11 +532,19 @@ public class TestVertexImpl2 { "testvertex", conf, mock(EventHandler.class), mock(TaskCommunicatorManagerInterface.class), mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext, VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null, - new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class)); + new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class), + dagConf); + + if (checkVertexOnlyConf) { + Assert.assertEquals("xyz1", vertex.vertexOnlyConf.get("abc1")); + Assert.assertEquals("bar2", vertex.vertexOnlyConf.get("foo1")); + Assert.assertEquals("bar", vertex.vertexOnlyConf.get("foo")); + } + } VertexWrapper(VertexPlan vertexPlan, Configuration conf) { - this(null, vertexPlan, conf); + this(null, vertexPlan, conf, false); } } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 99c85ab..a45f092 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1417,7 +1417,7 @@ public class TestContainerReuse { Collections.singletonList(new InputSpec("vertexName", InputDescriptor.create("inputClassName"), 1)), Collections.singletonList(new OutputSpec("vertexName", - OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint, + OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint, priority.getPriority(), containerContext, 0, 0, 0); return lr; } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java index 60ebc53..25d61d0 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java @@ -15,12 +15,18 @@ package org.apache.tez.util; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; @@ -29,6 +35,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto.Builder; public class ProtoConverters { @@ -65,15 +72,24 @@ public class ProtoConverters { } } + Configuration taskConf = null; + if (taskSpecProto.hasTaskConf()) { + taskConf = new Configuration(false); + Map<String, String> confMap = + DagTypeConverters.convertConfFromProto(taskSpecProto.getTaskConf()); + for (Entry<String, String> e : confMap.entrySet()) { + taskConf.set(e.getKey(), e.getValue()); + } + } TaskSpec taskSpec = new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(), taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList, - outputSpecList, groupInputSpecs); + outputSpecList, groupInputSpecs, taskConf); return taskSpec; } public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { - TaskSpecProto.Builder builder = TaskSpecProto.newBuilder(); + Builder builder = TaskSpecProto.newBuilder(); builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString()); builder.setDagName(taskSpec.getDAGName()); builder.setVertexName(taskSpec.getVertexName()); @@ -102,6 +118,17 @@ public class ProtoConverters { } } + if (taskSpec.getTaskConf() != null) { + ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder(); + Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + confBuilder.addConfKeyValues(PlanKeyValuePair.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()).build()); + } + builder.setTaskConf(confBuilder.build()); + } return builder.build(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto index 2f8b2e6..a01a299 100644 --- a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto +++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto @@ -44,6 +44,7 @@ message TaskSpecProto { repeated IOSpecProto output_specs = 6; repeated GroupInputSpecProto grouped_input_specs = 7; optional int32 vertex_parallelism = 8; + optional ConfigurationProto task_conf = 9; } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 0c1dc66..05bcd98 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -192,7 +192,7 @@ public class TestMROutput { dagName, vertexName, -1, procDesc, inputSpecs, - outputSpecs, null); + outputSpecs, null, null); FileSystem fs = FileSystem.getLocal(conf); Path workDir = http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 133ef9e..8309966 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -213,7 +213,7 @@ public class MapUtils { dagName, vertexName, -1, mapProcessorDesc, inputSpecs, - outputSpecs, null); + outputSpecs, null, null); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 382bc0e..1922c53 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -205,7 +205,7 @@ public class TestReduceProcessor { reduceVertexName, -1, reduceProcessorDesc, Collections.singletonList(reduceInputSpec), - Collections.singletonList(reduceOutputSpec), null); + Collections.singletonList(reduceOutputSpec), null, null); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 0863e65..e49791f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -1054,4 +1054,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { public HadoopShim getHadoopShim() { return hadoopShim; } + + @Private + @VisibleForTesting + public Configuration getTaskConf() { + return tezConf; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java index 7fce1d4..78bb1e9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringInterner; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -42,6 +44,7 @@ public class TaskSpec implements Writable { private List<OutputSpec> outputSpecList; private List<GroupInputSpec> groupInputSpecList; private int vertexParallelism = -1; + private Configuration taskConf; public TaskSpec() { } @@ -49,17 +52,27 @@ public class TaskSpec implements Writable { public static TaskSpec createBaseTaskSpec(String dagName, String vertexName, int vertexParallelism, ProcessorDescriptor processorDescriptor, List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, - @Nullable List<GroupInputSpec> groupInputSpecList) { + @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) { return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, - outputSpecList, groupInputSpecList); + outputSpecList, groupInputSpecList, taskConf); } public TaskSpec( String dagName, String vertexName, int vertexParallelism, ProcessorDescriptor processorDescriptor, - List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, @Nullable List<GroupInputSpec> groupInputSpecList) { + this(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecList, null); + } + + public TaskSpec( + String dagName, String vertexName, + int vertexParallelism, + ProcessorDescriptor processorDescriptor, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) { checkNotNull(dagName, "dagName is null"); checkNotNull(vertexName, "vertexName is null"); checkNotNull(processorDescriptor, "processorDescriptor is null"); @@ -73,14 +86,25 @@ public class TaskSpec implements Writable { this.outputSpecList = outputSpecList; this.groupInputSpecList = groupInputSpecList; this.vertexParallelism = vertexParallelism; + this.taskConf = taskConf; + } + + public TaskSpec(TezTaskAttemptID taskAttemptID, + String dagName, String vertexName, + int vertexParallelism, + ProcessorDescriptor processorDescriptor, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList) { + this(taskAttemptID, dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecList, null); } public TaskSpec(TezTaskAttemptID taskAttemptID, String dagName, String vertexName, int vertexParallelism, ProcessorDescriptor processorDescriptor, - List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, - @Nullable List<GroupInputSpec> groupInputSpecList) { + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) { checkNotNull(taskAttemptID, "taskAttemptID is null"); checkNotNull(dagName, "dagName is null"); checkNotNull(vertexName, "vertexName is null"); @@ -95,6 +119,7 @@ public class TaskSpec implements Writable { this.outputSpecList = outputSpecList; this.groupInputSpecList = groupInputSpecList; this.vertexParallelism = vertexParallelism; + this.taskConf = taskConf; } public String getDAGName() { @@ -133,6 +158,10 @@ public class TaskSpec implements Writable { return groupInputSpecList; } + public Configuration getTaskConf() { + return taskConf; + } + @Override public void write(DataOutput out) throws IOException { taskAttemptId.write(out); @@ -157,6 +186,12 @@ public class TaskSpec implements Writable { } else { out.writeBoolean(false); } + if (taskConf != null) { + out.writeBoolean(true); + taskConf.write(out); + } else { + out.writeBoolean(false); + } } @Override @@ -192,6 +227,11 @@ public class TaskSpec implements Writable { groupInputSpecList.add(group); } } + boolean hasVertexConf = in.readBoolean(); + if (hasVertexConf) { + taskConf = new Configuration(false); + taskConf.readFields(in); + } } @Override @@ -220,6 +260,9 @@ public class TaskSpec implements Writable { } sb.append("]"); } + if (taskConf != null) { + sb.append(", taskConfEntryCount=" + taskConf.size()); + } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index afa08e7..96f8474 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -17,7 +17,9 @@ package org.apache.tez.runtime.task; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -93,6 +95,7 @@ public class TezTaskRunner2 { private final Condition oobSignalCondition = oobSignalLock.newCondition(); private volatile long taskKillStartTime = 0; + final Configuration taskConf; private final HadoopShim hadoopShim; @@ -114,7 +117,15 @@ public class TezTaskRunner2 { this.executor = executor; this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler(); this.hadoopShim = hadoopShim; - this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, + this.taskConf = new Configuration(tezConf); + if (taskSpec.getTaskConf() != null) { + Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + taskConf.set(entry.getKey(), entry.getValue()); + } + } + this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim); } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index 00e830f..ecfc424 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -190,7 +190,7 @@ public class TestLogicalIOProcessorRuntimeTask { ProcessorDescriptor processorDesc = createProcessorDescriptor(); TaskSpec taskSpec = new TaskSpec(taskAttemptID, dagName, vertexName, parallelism, processorDesc, - createInputSpecList(), createOutputSpecList(), null); + createInputSpecList(), createOutputSpecList(), null, null); return taskSpec; } http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java new file mode 100644 index 0000000..dfe19f8 --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; +import org.junit.Test; + +public class TestTaskSpec { + + @Test (timeout = 5000) + public void testSerDe() throws IOException { + ByteBuffer payload = null; + ProcessorDescriptor procDesc = ProcessorDescriptor.create("proc").setUserPayload( + UserPayload.create(payload)).setHistoryText("historyText"); + + List<InputSpec> inputSpecs = new ArrayList<>(); + InputSpec inputSpec = new InputSpec("src1", InputDescriptor.create("inputClass"),10); + inputSpecs.add(inputSpec); + List<OutputSpec> outputSpecs = new ArrayList<>(); + OutputSpec outputSpec = new OutputSpec("dest1", OutputDescriptor.create("outputClass"), 999); + outputSpecs.add(outputSpec); + List<GroupInputSpec> groupInputSpecs = null; + + Configuration taskConf = new Configuration(false); + taskConf.set("foo", "bar"); + + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance( + TezVertexID.getInstance(TezDAGID.getInstance("1234", 1, 1), 1), 1), 1); + TaskSpec taskSpec = new TaskSpec(taId, "dagName", "vName", -1, procDesc, inputSpecs, outputSpecs, + groupInputSpecs, taskConf); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(bos); + taskSpec.write(out); + + TaskSpec deSerTaskSpec = new TaskSpec(); + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInput in = new DataInputStream(bis); + deSerTaskSpec.readFields(in); + + Assert.assertEquals(taskSpec.getDAGName(), deSerTaskSpec.getDAGName()); + Assert.assertEquals(taskSpec.getVertexName(), deSerTaskSpec.getVertexName()); + Assert.assertEquals(taskSpec.getVertexParallelism(), deSerTaskSpec.getVertexParallelism()); + Assert.assertEquals(taskSpec.getInputs().size(), deSerTaskSpec.getInputs().size()); + Assert.assertEquals(taskSpec.getOutputs().size(), deSerTaskSpec.getOutputs().size()); + Assert.assertNull(deSerTaskSpec.getGroupInputs()); + Assert.assertEquals(taskSpec.getInputs().get(0).getSourceVertexName(), + deSerTaskSpec.getInputs().get(0).getSourceVertexName()); + Assert.assertEquals(taskSpec.getOutputs().get(0).getDestinationVertexName(), + deSerTaskSpec.getOutputs().get(0).getDestinationVertexName()); + + Assert.assertEquals(taskConf.get("foo"), deSerTaskSpec.getTaskConf().get("foo")); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index c3b9abd..6cb49fa 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -740,7 +740,7 @@ public class TestTaskExecution2 { .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf))); TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor, - new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null); + new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null); TezTaskRunner2 taskRunner; if (testRunner) { http://git-wip-us.apache.org/repos/asf/tez/blob/e42c9794/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java new file mode 100644 index 0000000..f58421a --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.common.ProtoConverters; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Assert; +import org.junit.Test; + + +public class TestTezTaskRunner2 { + + @Test (timeout = 5000) + public void testTaskConfUsage() throws Exception { + Configuration conf = new Configuration(false); + conf.set("global", "global1"); + conf.set("global_override", "global1"); + String[] localDirs = null; + Configuration taskConf = new Configuration(false); + conf.set("global_override", "task1"); + conf.set("task", "task1"); + + List<InputSpec> inputSpecList = new ArrayList<>(); + List<OutputSpec> outputSpecList = new ArrayList<>(); + TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class), + inputSpecList, outputSpecList, null, taskConf); + TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class), + localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid", + null, 1000, false, new DefaultHadoopShim()); + + Assert.assertEquals("global1", taskRunner2.task.getTaskConf().get("global")); + Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("global_override")); + Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("task")); + } + + +}
