Repository: tez Updated Branches: refs/heads/master d9802627b -> 2c212858a
TEZ-3290. Set full task attempt id string in MRInput configuration object. Contributed by Prasanth Jayachandran. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2c212858 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2c212858 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2c212858 Branch: refs/heads/master Commit: 2c212858ab91fb49ca8e6ab36bf51328adcccb43 Parents: d980262 Author: Siddharth Seth <[email protected]> Authored: Wed Jun 8 15:02:50 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Jun 8 15:02:50 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/mapreduce/hadoop/MRInputHelpers.java | 40 ++++++++++++++++++++ .../org/apache/tez/mapreduce/input/MRInput.java | 4 ++ .../tez/mapreduce/input/base/MRInputBase.java | 13 +++++++ .../apache/tez/mapreduce/input/TestMRInput.java | 10 +++++ 5 files changed, 69 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2c212858/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b542036..3fcb936 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3290. Set full task attempt id string in MRInput configuration object. TEZ-2846. Flaky test: TestCommit.testVertexCommit_OnDAGSuccess. TEZ-3289. Tez Example MRRSleep job does not set Staging dir correctly on secure cluster. TEZ-3276. Tez Example MRRSleep job fails when tez.staging-dir fs is not same as default FS. @@ -55,6 +56,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3290. Set full task attempt id string in MRInput configuration object. TEZ-3280. LOG MRInputHelpers split generation message as INFO TEZ-3257. Fix flaky test TestUnorderedPartitionedKVWriter. TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer http://git-wip-us.apache.org/repos/asf/tez/blob/2c212858/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 034d379..2a806ab 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -730,6 +730,16 @@ public class MRInputHelpers { } /** + * Returns string representation of full DAG identifier + * @param conf configuration instance + * @return dag identifier + */ + @Public + public static String getDagIdString(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ID); + } + + /** * * @see {@link InputContext#getTaskVertexIndex} * @param conf configuration instance * @return vertex index @@ -740,6 +750,16 @@ public class MRInputHelpers { } /** + * Returns string representation of full vertex identifier + * @param conf configuration instance + * @return vertex identifier + */ + @Public + public static String getVertexIdString(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_ID); + } + + /** * @see {@link InputContext#getTaskIndex} * @param conf configuration instance * @return task index @@ -750,6 +770,16 @@ public class MRInputHelpers { } /** + * Returns string representation of full task identifier + * @param conf configuration instance + * @return task identifier + */ + @Public + public static String getTaskIdString(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ID); + } + + /** * @see {@link InputContext#getTaskAttemptNumber} * @param conf configuration instance * @return task attempt index @@ -760,6 +790,16 @@ public class MRInputHelpers { } /** + * Returns string representation of full task attempt identifier + * @param conf configuration instance + * @return task attempt identifier + */ + @Public + public static String getTaskAttemptIdString(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID); + } + + /** * @see {@link InputContext#getInputIndex} * @param conf configuration instance * @return input index http://git-wip-us.apache.org/repos/asf/tez/blob/2c212858/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 4a4ba86..e859058 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -94,6 +94,10 @@ public class MRInput extends MRInputBase { @Private public static final String TEZ_MAPREDUCE_APPLICATION_ID = "tez.mapreduce.application.id"; @Private public static final String TEZ_MAPREDUCE_UNIQUE_IDENTIFIER = "tez.mapreduce.unique.identifier"; @Private public static final String TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER = "tez.mapreduce.dag.attempt.number"; + @Private public static final String TEZ_MAPREDUCE_DAG_ID= "tez.mapreduce.dag.id"; + @Private public static final String TEZ_MAPREDUCE_VERTEX_ID = "tez.mapreduce.vertex.id"; + @Private public static final String TEZ_MAPREDUCE_TASK_ID = "tez.mapreduce.task.id"; + @Private public static final String TEZ_MAPREDUCE_TASK_ATTEMPT_ID = "tez.mapreduce.task.attempt.id"; http://git-wip-us.apache.org/repos/asf/tez/blob/2c212858/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java index 230f55e..9a26c2b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java @@ -30,6 +30,10 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; @@ -109,6 +113,15 @@ public abstract class MRInputBase extends AbstractLogicalInput { jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier()); jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber()); + TezDAGID tezDAGID = TezDAGID.getInstance(getContext().getApplicationId(), getContext().getDagIdentifier()); + TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, getContext().getTaskVertexIndex()); + TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, getContext().getTaskIndex()); + TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, getContext().getTaskAttemptNumber()); + jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_ID, tezDAGID.toString()); + jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_ID, tezVertexID.toString()); + jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ID, tezTaskID.toString()); + jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID, tezTaskAttemptID.toString()); + this.inputRecordCounter = getContext().getCounters().findCounter( TaskCounter.INPUT_RECORDS_PROCESSED); http://git-wip-us.apache.org/repos/asf/tez/blob/2c212858/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java index b42ef25..b878416 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java @@ -97,6 +97,11 @@ public class TestMRInput { private static final int TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX = 4000; private static final int TEST_ATTRIBUTES_INPUT_INDEX = 5000; private static final int TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER = 6000; + private static final String TEST_ATTRIBUTES_APPLICATION_ID_STRING = "application_0_0000"; + private static final String TEST_ATTRIBUTES_DAG_ID_STRING = "dag_0_0000_1000"; + private static final String TEST_ATTRIBUTES_VERTEX_ID_STRING = "vertex_0_0000_1000_2000"; + private static final String TEST_ATTRIBUTES_TASK_ID_STRING = "task_0_0000_1000_2000_003000"; + private static final String TEST_ATTRIBUTES_TASK_ATTEMPT_ID_STRING = "attempt_0_0000_1000_2000_003000_4000"; @Test(timeout = 5000) public void testAttributesInJobConf() throws Exception { @@ -163,6 +168,11 @@ public class TestMRInput { assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX, MRInputHelpers.getTaskAttemptIndex(job)); assertEquals(TEST_ATTRIBUTES_INPUT_INDEX, MRInputHelpers.getInputIndex(job)); assertEquals(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER, MRInputHelpers.getDagAttemptNumber(job)); + assertEquals(TEST_ATTRIBUTES_APPLICATION_ID_STRING, MRInputHelpers.getApplicationIdString(job)); + assertEquals(TEST_ATTRIBUTES_DAG_ID_STRING, MRInputHelpers.getDagIdString(job)); + assertEquals(TEST_ATTRIBUTES_VERTEX_ID_STRING, MRInputHelpers.getVertexIdString(job)); + assertEquals(TEST_ATTRIBUTES_TASK_ID_STRING, MRInputHelpers.getTaskIdString(job)); + assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_ID_STRING, MRInputHelpers.getTaskAttemptIdString(job)); invoked.set(true); return new RecordReader() { @Override
