TEZ-2368. Make a dag identifier available in Context classes. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9e9cf995 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9e9cf995 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9e9cf995 Branch: refs/heads/TEZ-2003 Commit: 9e9cf995b497ab0ba56cd14b060fbc30efbed824 Parents: c3232d0 Author: Siddharth Seth <[email protected]> Authored: Mon Apr 27 16:09:04 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Apr 27 16:09:04 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/runtime/api/TaskContext.java | 8 ++ .../runtime/api/impl/TezTaskContextImpl.java | 5 + .../runtime/api/impl/TestProcessorContext.java | 102 +++++++++++++++++++ 4 files changed, 116 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 36e1767..b582b85 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2368. Make a dag identifier available in Context classes. TEZ-2325. Route status update event directly to the attempt. TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task. TEZ-2342. TestFaultTolerance.testRandomFailingTasks fails due to timeout. http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java index 8e22057..92d5575 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java @@ -78,6 +78,14 @@ public interface TaskContext { */ public int getTaskVertexIndex(); + /** + * Get a numeric identifier for the dag to which the task belongs. This will be unique within the + * running application. + * + * @return the dag identifier + */ + public int getDagIdentifier(); + public TezCounters getCounters(); /** http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 6c0a869..a156f54 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -145,6 +145,11 @@ public abstract class TezTaskContextImpl implements TaskContext { } @Override + public int getDagIdentifier() { + return taskAttemptID.getTaskID().getVertexID().getDAGId().getId(); + } + + @Override public TezCounters getCounters() { return counters; } http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java new file mode 100644 index 0000000..e28df3a --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.InputReadyTracker; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.common.resources.MemoryDistributor; +import org.junit.Test; + +public class TestProcessorContext { + + @Test (timeout = 5000) + public void testDagNumber() { + String[] localDirs = new String[] {"dummyLocalDir"}; + int appAttemptNumber = 1; + TezUmbilical tezUmbilical = mock(TezUmbilical.class); + String dagName = "DAG_NAME"; + String vertexName = "VERTEX_NAME"; + int vertexParallelism = 20; + int dagNumber = 52; + ApplicationId appId = ApplicationId.newInstance(10000, 13); + TezDAGID dagId = TezDAGID.getInstance(appId, dagNumber); + TezVertexID vertexId = TezVertexID.getInstance(dagId, 6); + TezTaskID taskId = TezTaskID.getInstance(vertexId, 4); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 2); + + LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class); + doReturn(new TezCounters()).when(runtimeTask).addAndGetTezCounter(any(String.class)); + Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap(); + Map<String, String> auxServiceEnv = Maps.newHashMap(); + MemoryDistributor memDist = mock(MemoryDistributor.class); + ProcessorDescriptor processorDesc = mock(ProcessorDescriptor.class); + InputReadyTracker inputReadyTracker = mock(InputReadyTracker.class); + ObjectRegistry objectRegistry = new ObjectRegistryImpl(); + ExecutionContext execContext = new ExecutionContextImpl("localhost"); + long memAvailable = 10000l; + + TezProcessorContextImpl procContext = + new TezProcessorContextImpl( + new Configuration(), + localDirs, + appAttemptNumber, + tezUmbilical, + dagName, + vertexName, + vertexParallelism, + taskAttemptId, + null, + runtimeTask, + serviceConsumerMetadata, + auxServiceEnv, + memDist, + processorDesc, + inputReadyTracker, + objectRegistry, + execContext, + memAvailable); + + assertEquals(dagNumber, procContext.getDagIdentifier()); + assertEquals(appAttemptNumber, procContext.getDAGAttemptNumber()); + assertEquals(appId, procContext.getApplicationId()); + assertEquals(dagName, procContext.getDAGName()); + assertEquals(vertexName, procContext.getTaskVertexName()); + assertEquals(vertexId.getId(), procContext.getTaskVertexIndex()); + assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs())); + + } +}
