TEZ-2368. Make a dag identifier available in Context classes. (sseth)

Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9e9cf995
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9e9cf995
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9e9cf995

Branch: refs/heads/TEZ-2003
Commit: 9e9cf995b497ab0ba56cd14b060fbc30efbed824
Parents: c3232d0
Author: Siddharth Seth <[email protected]>
Authored: Mon Apr 27 16:09:04 2015 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Mon Apr 27 16:09:04 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/runtime/api/TaskContext.java |   8 ++
 .../runtime/api/impl/TezTaskContextImpl.java    |   5 +
 .../runtime/api/impl/TestProcessorContext.java  | 102 +++++++++++++++++++
 4 files changed, 116 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36e1767..b582b85 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2368. Make a dag identifier available in Context classes.
   TEZ-2325. Route status update event directly to the attempt.
   TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per 
source-task.
   TEZ-2342. TestFaultTolerance.testRandomFailingTasks fails due to timeout.

http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java 
b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 8e22057..92d5575 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -78,6 +78,14 @@ public interface TaskContext {
    */
   public int getTaskVertexIndex();
 
+  /**
+   * Get a numeric identifier for the dag to which the task belongs. This will 
be unique within the
+   * running application.
+   *
+   * @return the dag identifier
+   */
+  public int getDagIdentifier();
+
   public TezCounters getCounters();
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 6c0a869..a156f54 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -145,6 +145,11 @@ public abstract class TezTaskContextImpl implements 
TaskContext {
   }
 
   @Override
+  public int getDagIdentifier() {
+    return taskAttemptID.getTaskID().getVertexID().getDAGId().getId();
+  }
+
+  @Override
   public TezCounters getCounters() {
     return counters;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/9e9cf995/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
new file mode 100644
index 0000000..e28df3a
--- /dev/null
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.InputReadyTracker;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
+import org.junit.Test;
+
+public class TestProcessorContext {
+
+  @Test (timeout = 5000)
+  public void testDagNumber() {
+    String[] localDirs = new String[] {"dummyLocalDir"};
+    int appAttemptNumber = 1;
+    TezUmbilical tezUmbilical = mock(TezUmbilical.class);
+    String dagName = "DAG_NAME";
+    String vertexName = "VERTEX_NAME";
+    int vertexParallelism = 20;
+    int dagNumber = 52;
+    ApplicationId appId = ApplicationId.newInstance(10000, 13);
+    TezDAGID dagId = TezDAGID.getInstance(appId, dagNumber);
+    TezVertexID vertexId = TezVertexID.getInstance(dagId, 6);
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, 4);
+    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
+
+    LogicalIOProcessorRuntimeTask runtimeTask = 
mock(LogicalIOProcessorRuntimeTask.class);
+    doReturn(new 
TezCounters()).when(runtimeTask).addAndGetTezCounter(any(String.class));
+    Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
+    Map<String, String> auxServiceEnv = Maps.newHashMap();
+    MemoryDistributor memDist = mock(MemoryDistributor.class);
+    ProcessorDescriptor processorDesc = mock(ProcessorDescriptor.class);
+    InputReadyTracker inputReadyTracker = mock(InputReadyTracker.class);
+    ObjectRegistry objectRegistry = new ObjectRegistryImpl();
+    ExecutionContext execContext = new ExecutionContextImpl("localhost");
+    long memAvailable = 10000l;
+
+    TezProcessorContextImpl procContext =
+        new TezProcessorContextImpl(
+            new Configuration(),
+            localDirs,
+            appAttemptNumber,
+            tezUmbilical,
+            dagName,
+            vertexName,
+            vertexParallelism,
+            taskAttemptId,
+            null,
+            runtimeTask,
+            serviceConsumerMetadata,
+            auxServiceEnv,
+            memDist,
+            processorDesc,
+            inputReadyTracker,
+            objectRegistry,
+            execContext,
+            memAvailable);
+
+    assertEquals(dagNumber, procContext.getDagIdentifier());
+    assertEquals(appAttemptNumber, procContext.getDAGAttemptNumber());
+    assertEquals(appId, procContext.getApplicationId());
+    assertEquals(dagName, procContext.getDAGName());
+    assertEquals(vertexName, procContext.getTaskVertexName());
+    assertEquals(vertexId.getId(), procContext.getTaskVertexIndex());
+    assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs()));
+
+  }
+}

Reply via email to