Repository: tez Updated Branches: refs/heads/branch-0.7 b06afabb9 -> 8eb707e4e
TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8eb707e4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8eb707e4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8eb707e4 Branch: refs/heads/branch-0.7 Commit: 8eb707e4e4ef8113d0265d21f1f0c0cb729e066f Parents: b06afab Author: Hitesh Shah <[email protected]> Authored: Thu Jul 7 12:52:31 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Jul 7 12:52:31 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 12 +-- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 6 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 14 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 22 +++-- .../tez/dag/app/dag/impl/TestVertexImpl2.java | 18 +++- .../tez/dag/app/rm/TestContainerReuse.java | 2 +- .../tez/mapreduce/output/TestMROutput.java | 2 +- .../tez/mapreduce/processor/MapUtils.java | 2 +- .../processor/reduce/TestReduceProcessor.java | 2 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 6 ++ .../apache/tez/runtime/api/impl/TaskSpec.java | 53 ++++++++++-- .../apache/tez/runtime/task/TezTaskRunner.java | 17 +++- .../TestLogicalIOProcessorRuntimeTask.java | 2 +- .../tez/runtime/api/impl/TestTaskSpec.java | 91 ++++++++++++++++++++ .../tez/runtime/task/TestTezTaskRunner.java | 61 +++++++++++++ 17 files changed, 288 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8696a59..4626e93 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). TEZ-3223. Support a NullHistoryLogger to disable history logging if needed. TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 3bbc0f9..a96d560 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -749,7 +749,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX + "scale.memory.enabled"; @Private @@ -760,7 +760,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX + "scale.memory.allocator.class"; @Private @@ -773,7 +773,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX + "scale.memory.reserve-fraction"; @Private @@ -785,7 +785,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO = TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io"; @@ -794,7 +794,7 @@ public class TezConfiguration extends Configuration { /** * Max cumulative total reservation for additional IOs. */ - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX = TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max"; /* @@ -804,7 +804,7 @@ public class TezConfiguration extends Configuration { */ @Private @Unstable - @ConfigurationScope(Scope.AM) + @ConfigurationScope(Scope.VERTEX) public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS = TEZ_TASK_PREFIX + "scale.memory.ratios"; http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 6b98a7b..b7ad3b1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -194,7 +194,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private long cachedCountersTimestamp = 0; private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>(); + // Combined configs for the DAG private final Configuration dagConf; + // DAG specific configs only + // Useful when trying to serialize only the diff from global configs + private final Configuration dagOnlyConf; + private final DAGPlan jobPlan; private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false); @@ -502,6 +507,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.dagId = dagId; this.jobPlan = jobPlan; this.dagConf = new Configuration(amConf); + this.dagOnlyConf = new Configuration(false); Iterator<PlanKeyValuePair> iter = jobPlan.getDagConf().getConfKeyValuesList().iterator(); // override the amConf by using DAG level configuration @@ -509,6 +515,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, PlanKeyValuePair keyValPair = iter.next(); TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG); this.dagConf.set(keyValPair.getKey(), keyValPair.getValue()); + this.dagOnlyConf.set(keyValPair.getKey(), keyValPair.getValue()); } this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>"; this.userName = appUserName; @@ -1649,7 +1656,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, dag.eventHandler, dag.taskAttemptListener, dag.clock, dag.taskHeartbeatHandler, !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint, - dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker); + dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker, + dag.dagOnlyConf); return v; } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 4a0742f..2dfd7f2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -830,9 +830,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber); TaskSpec taskSpec = null; if (baseTaskSpec != null) { - taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), + taskSpec = new TaskSpec(attemptId, + baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), - baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); + baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs(), + baseTaskSpec.getTaskConf()); } return new TaskAttemptImpl(attemptId, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 0d6bc68..1e875d2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -231,7 +231,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private long cachedCountersTimestamp = 0; private Resource taskResource; + // Merged/combined vertex level config private Configuration vertexConf; + // Vertex specific configs only ( include the dag specific configs too ) + // Useful when trying to serialize only the diff from global configs + @VisibleForTesting + Configuration vertexOnlyConf; private final boolean isSpeculationEnabled; @@ -875,19 +880,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl TaskHeartbeatHandler thh, boolean commitVertexOutputs, AppContext appContext, VertexLocationHint vertexLocationHint, Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, - StateChangeNotifier entityStatusTracker) { + StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) { this.vertexId = vertexId; this.vertexPlan = vertexPlan; this.vertexName = StringInterner.weakIntern(vertexName); this.vertexConf = new Configuration(dagConf); - // override dag configuration by using vertex's specified configuration + this.vertexOnlyConf = new Configuration(dagOnlyConf); if (vertexPlan.hasVertexConf()) { ConfigurationProto confProto = vertexPlan.getVertexConf(); for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) { TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX); vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue()); + vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue()); } } + + this.clock = clock; this.appContext = appContext; this.commitVertexOutputs = commitVertexOutputs; @@ -1538,7 +1546,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return TaskSpec.createBaseTaskSpec(getDAG().getName(), getName(), getTotalTasks(), getProcessorDescriptor(), getInputSpecList(taskIndex), getOutputSpecList(taskIndex), - getGroupInputSpecList(taskIndex)); + getGroupInputSpecList(taskIndex), vertexOnlyConf); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index f0a8625..520d10f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2258,6 +2258,8 @@ public class TestVertexImpl { LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt); vertices = new HashMap<String, VertexImpl>(); vertexIdMap = new HashMap<TezVertexID, VertexImpl>(); + Configuration dagConf = new Configuration(false); + dagConf.set("abc", "foobar"); for (int i = 0; i < vCnt; ++i) { VertexPlan vPlan = dagPlan.getVertex(i); String vName = vPlan.getName(); @@ -2269,17 +2271,18 @@ public class TestVertexImpl { if (customInitializer == null) { v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskAttemptListener, - clock, thh, appContext, locationHint, dispatcher, updateTracker); + clock, thh, appContext, locationHint, dispatcher, updateTracker, dagConf); } else { v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskAttemptListener, - clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker); + clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker, + dagConf); } } else { v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskAttemptListener, clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, dagConf); } vertices.put(vName, v); vertexIdMap.put(vertexId, v); @@ -3120,6 +3123,7 @@ public class TestVertexImpl { TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0); Assert.assertEquals(mockLocation, event.getTaskLocationHint()); Assert.assertNotNull(event.getBaseTaskSpec()); + Assert.assertEquals("foobar", event.getBaseTaskSpec().getTaskConf().get("abc")); } @Test(timeout = 5000) @@ -5417,7 +5421,7 @@ public class TestVertexImpl { VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf, dispatcher.getEventHandler(), taskAttemptListener, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, new Configuration(false)); v.setInputVertices(new HashMap()); vertexIdMap.put(vId, v); vertices.put(v.getName(), v); @@ -5452,11 +5456,12 @@ public class TestVertexImpl { VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, InputInitializer presetInitializer, - StateChangeNotifier updateTracker) { + StateChangeNotifier updateTracker, + Configuration dagConf) { super(vertexId, vertexPlan, vertexName, conf, eventHandler, taskAttemptListener, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, dagConf); this.presetInitializer = presetInitializer; } @@ -5491,11 +5496,12 @@ public class TestVertexImpl { AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, - StateChangeNotifier updateTracker) { + StateChangeNotifier updateTracker, + Configuration dagConf) { super(vertexId, vertexPlan, vertexName, conf, eventHandler, taskAttemptListener, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, - updateTracker); + updateTracker, dagConf); this.dispatcher = dispatcher; } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java index b4064a0..483c3a5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java @@ -39,6 +39,8 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; @@ -271,8 +273,14 @@ public class TestVertexImpl2 { doReturn(new Credentials()).when(mockDag).getCredentials(); doReturn(mockDag).when(mockAppContext).getCurrentDAG(); + ConfigurationProto confProto = ConfigurationProto.newBuilder() + .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo").setValue("bar").build()) + .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo1").setValue("bar2").build()) + .build(); + vertexPlan = DAGProtos.VertexPlan.newBuilder() .setName(vertexName) + .setVertexConf(confProto) .setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder() .setJavaOpts(initialJavaOpts) .setNumTasks(numTasks) @@ -286,12 +294,20 @@ public class TestVertexImpl2 { .build()) .setType(DAGProtos.PlanVertexType.NORMAL).build(); + Configuration dagConf = new Configuration(false); + dagConf.set("abc1", "xyz1"); + dagConf.set("foo1", "bar1"); + vertex = new VertexImpl(TezVertexID.fromString("vertex_1418197758681_0001_1_00"), vertexPlan, "testvertex", conf, mock(EventHandler.class), mock(TaskAttemptListener.class), mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext, VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null, - new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class)); + new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class), dagConf); + + assertEquals("xyz1", vertex.vertexOnlyConf.get("abc1")); + assertEquals("bar2", vertex.vertexOnlyConf.get("foo1")); + assertEquals("bar", vertex.vertexOnlyConf.get("foo")); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 7ad044b..d944e5d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1455,7 +1455,7 @@ public class TestContainerReuse { Collections.singletonList(new InputSpec("vertexName", InputDescriptor.create("inputClassName"), 1)), Collections.singletonList(new OutputSpec("vertexName", - OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint, + OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint, priority.getPriority(), containerContext); return lr; } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 45a24cb..bfeaad8 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -191,7 +191,7 @@ public class TestMROutput { dagName, vertexName, -1, procDesc, inputSpecs, - outputSpecs, null); + outputSpecs, null, null); FileSystem fs = FileSystem.getLocal(conf); Path workDir = http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 8841882..0d0eace 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -212,7 +212,7 @@ public class MapUtils { dagName, vertexName, -1, mapProcessorDesc, inputSpecs, - outputSpecs, null); + outputSpecs, null, null); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index fcb42b3..21d2929 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -204,7 +204,7 @@ public class TestReduceProcessor { reduceVertexName, -1, reduceProcessorDesc, Collections.singletonList(reduceInputSpec), - Collections.singletonList(reduceOutputSpec), null); + Collections.singletonList(reduceOutputSpec), null, null); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 11ef31d..70e2a18 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -967,4 +967,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { return this.outputsMap; } + @Private + @VisibleForTesting + public Configuration getTaskConf() { + return tezConf; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java index 4dc57e2..e082bf8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringInterner; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -42,6 +44,7 @@ public class TaskSpec implements Writable { private List<OutputSpec> outputSpecList; private List<GroupInputSpec> groupInputSpecList; private int vertexParallelism = -1; + private Configuration taskConf; public TaskSpec() { } @@ -49,17 +52,27 @@ public class TaskSpec implements Writable { public static TaskSpec createBaseTaskSpec(String dagName, String vertexName, int vertexParallelism, ProcessorDescriptor processorDescriptor, List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, - @Nullable List<GroupInputSpec> groupInputSpecList) { + @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) { return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, - outputSpecList, groupInputSpecList); + outputSpecList, groupInputSpecList, taskConf); } public TaskSpec( String dagName, String vertexName, int vertexParallelism, ProcessorDescriptor processorDescriptor, - List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, @Nullable List<GroupInputSpec> groupInputSpecList) { + this(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecList, null); + } + + public TaskSpec( + String dagName, String vertexName, + int vertexParallelism, + ProcessorDescriptor processorDescriptor, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) { checkNotNull(dagName, "dagName is null"); checkNotNull(vertexName, "vertexName is null"); checkNotNull(processorDescriptor, "processorDescriptor is null"); @@ -73,14 +86,25 @@ public class TaskSpec implements Writable { this.outputSpecList = outputSpecList; this.groupInputSpecList = groupInputSpecList; this.vertexParallelism = vertexParallelism; + this.taskConf = taskConf; + } + + public TaskSpec(TezTaskAttemptID taskAttemptID, + String dagName, String vertexName, + int vertexParallelism, + ProcessorDescriptor processorDescriptor, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList) { + this(taskAttemptID, dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecList, null); } public TaskSpec(TezTaskAttemptID taskAttemptID, String dagName, String vertexName, int vertexParallelism, ProcessorDescriptor processorDescriptor, - List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, - @Nullable List<GroupInputSpec> groupInputSpecList) { + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) { checkNotNull(taskAttemptID, "taskAttemptID is null"); checkNotNull(dagName, "dagName is null"); checkNotNull(vertexName, "vertexName is null"); @@ -95,6 +119,7 @@ public class TaskSpec implements Writable { this.outputSpecList = outputSpecList; this.groupInputSpecList = groupInputSpecList; this.vertexParallelism = vertexParallelism; + this.taskConf = taskConf; } public String getDAGName() { @@ -129,6 +154,10 @@ public class TaskSpec implements Writable { return groupInputSpecList; } + public Configuration getTaskConf() { + return taskConf; + } + @Override public void write(DataOutput out) throws IOException { taskAttemptId.write(out); @@ -153,6 +182,12 @@ public class TaskSpec implements Writable { } else { out.writeBoolean(false); } + if (taskConf != null) { + out.writeBoolean(true); + taskConf.write(out); + } else { + out.writeBoolean(false); + } } @Override @@ -188,6 +223,11 @@ public class TaskSpec implements Writable { groupInputSpecList.add(group); } } + boolean hasVertexConf = in.readBoolean(); + if (hasVertexConf) { + taskConf = new Configuration(false); + taskConf.readFields(in); + } } @Override @@ -216,6 +256,9 @@ public class TaskSpec implements Writable { } sb.append("]"); } + if (taskConf != null) { + sb.append(", taskConfEntryCount=" + taskConf.size()); + } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index de83889..c4cae15 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -23,7 +23,9 @@ import java.lang.reflect.UndeclaredThrowableException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.Collection; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,6 +46,7 @@ import org.apache.tez.runtime.api.impl.TezUmbilical; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -53,11 +56,13 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner.class); private final Configuration tezConf; - private final LogicalIOProcessorRuntimeTask task; + @VisibleForTesting + final LogicalIOProcessorRuntimeTask task; private final UserGroupInformation ugi; private final TaskReporter taskReporter; private final ListeningExecutorService executor; + final Configuration taskConf; private volatile ListenableFuture<Void> taskFuture; private volatile Thread waitingThread; private volatile Throwable firstException; @@ -78,7 +83,15 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { this.ugi = ugi; this.taskReporter = taskReporter; this.executor = executor; - task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this, + this.taskConf = new Configuration(tezConf); + if (taskSpec.getTaskConf() != null) { + Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + taskConf.set(entry.getKey(), entry.getValue()); + } + } + task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, this, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid, executionContext, memAvailable); taskRunning = new AtomicBoolean(false); http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index 520ad69..08e909e 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -192,7 +192,7 @@ public class TestLogicalIOProcessorRuntimeTask { ProcessorDescriptor processorDesc = createProcessorDescriptor(); TaskSpec taskSpec = new TaskSpec(taskAttemptID, dagName, vertexName, parallelism, processorDesc, - createInputSpecList(), createOutputSpecList(), null); + createInputSpecList(), createOutputSpecList(), null, null); return taskSpec; } http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java new file mode 100644 index 0000000..f3cb49b --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; +import org.junit.Test; + +public class TestTaskSpec { + + @Test (timeout = 5000) + public void testSerDe() throws IOException { + ByteBuffer payload = null; + ProcessorDescriptor procDesc = ProcessorDescriptor.create("proc").setUserPayload( + UserPayload.create(payload)).setHistoryText("historyText"); + + List<InputSpec> inputSpecs = new ArrayList<InputSpec>(); + InputSpec inputSpec = new InputSpec("src1", InputDescriptor.create("inputClass"),10); + inputSpecs.add(inputSpec); + List<OutputSpec> outputSpecs = new ArrayList<OutputSpec>(); + OutputSpec outputSpec = new OutputSpec("dest1", OutputDescriptor.create("outputClass"), 999); + outputSpecs.add(outputSpec); + List<GroupInputSpec> groupInputSpecs = null; + + Configuration taskConf = new Configuration(false); + taskConf.set("foo", "bar"); + + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance( + TezVertexID.getInstance(TezDAGID.getInstance("1234", 1, 1), 1), 1), 1); + TaskSpec taskSpec = new TaskSpec(taId, "dagName", "vName", -1, procDesc, inputSpecs, outputSpecs, + groupInputSpecs, taskConf); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(bos); + taskSpec.write(out); + + TaskSpec deSerTaskSpec = new TaskSpec(); + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInput in = new DataInputStream(bis); + deSerTaskSpec.readFields(in); + + Assert.assertEquals(taskSpec.getDAGName(), deSerTaskSpec.getDAGName()); + Assert.assertEquals(taskSpec.getVertexName(), deSerTaskSpec.getVertexName()); + Assert.assertEquals(taskSpec.getVertexParallelism(), deSerTaskSpec.getVertexParallelism()); + Assert.assertEquals(taskSpec.getInputs().size(), deSerTaskSpec.getInputs().size()); + Assert.assertEquals(taskSpec.getOutputs().size(), deSerTaskSpec.getOutputs().size()); + Assert.assertNull(deSerTaskSpec.getGroupInputs()); + Assert.assertEquals(taskSpec.getInputs().get(0).getSourceVertexName(), + deSerTaskSpec.getInputs().get(0).getSourceVertexName()); + Assert.assertEquals(taskSpec.getOutputs().get(0).getDestinationVertexName(), + deSerTaskSpec.getOutputs().get(0).getDestinationVertexName()); + + Assert.assertEquals(taskConf.get("foo"), deSerTaskSpec.getTaskConf().get("foo")); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java new file mode 100644 index 0000000..c943e02 --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Assert; +import org.junit.Test; + +public class TestTezTaskRunner { + + @Test (timeout = 5000) + public void testTaskConfUsage() throws Exception { + Configuration conf = new Configuration(false); + conf.set("global", "global1"); + conf.set("global_override", "global1"); + String[] localDirs = null; + Configuration taskConf = new Configuration(false); + conf.set("global_override", "task1"); + conf.set("task", "task1"); + + List<InputSpec> inputSpecList = new ArrayList<InputSpec>(); + List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(); + TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class), + inputSpecList, outputSpecList, null, taskConf); + TezTaskRunner taskRunner = new TezTaskRunner(conf, mock(UserGroupInformation.class), + localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid", + null, 1000); + + Assert.assertEquals("global1", taskRunner.task.getTaskConf().get("global")); + Assert.assertEquals("task1", taskRunner.task.getTaskConf().get("global_override")); + Assert.assertEquals("task1", taskRunner.task.getTaskConf().get("task")); + } + + +}
