Repository: tez Updated Branches: refs/heads/master f352cfb4d -> 99c85d3f9
TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/99c85d3f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/99c85d3f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/99c85d3f Branch: refs/heads/master Commit: 99c85d3f95ac9bbee9c507c4efdc2757ea5b8542 Parents: f352cfb Author: Siddharth Seth <[email protected]> Authored: Wed Feb 10 20:37:50 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Wed Feb 10 20:37:50 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/hadoop/MRInputHelpers.java | 124 +++++++++++++++ .../org/apache/tez/mapreduce/input/MRInput.java | 16 +- .../tez/mapreduce/input/base/MRInputBase.java | 12 ++ .../apache/tez/mapreduce/input/TestMRInput.java | 151 +++++++++++++++++++ .../tez/mapreduce/input/TestMultiMRInput.java | 2 + 6 files changed, 305 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4d7ae6b..c769843 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf. TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin. TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs. TEZ-2307. Possible wrong error message when submitting new dag http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 30e4a8c..325e7b2 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -35,6 +35,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.InputContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -707,4 +708,127 @@ public class MRInputHelpers { return UserPayload.create(userPayloadBuilder.build().toByteString().asReadOnlyByteBuffer()); } + + private static String getStringProperty(Configuration conf, String propertyName) { + Preconditions.checkNotNull(conf, "Configuration must be provided"); + String propertyString = conf.get(propertyName); + Preconditions.checkNotNull(propertyString, + "Property " + propertyName + " not found in provided configuration"); + return propertyString; + } + + private static int getIntProperty(Configuration conf, String propertyName) { + return Integer.parseInt(getStringProperty(conf, propertyName)); + } + + /** + * @see {@link InputContext#getDagIdentifier} + * @param conf configuration instance + * @return dag index + */ + @Public + public static int getDagIndex(Configuration conf) { + return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_INDEX); + } + + /** + * * @see {@link InputContext#getTaskVertexIndex} + * @param conf configuration instance + * @return vertex index + */ + @Public + public static int getVertexIndex(Configuration conf) { + return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_INDEX); + } + + /** + * @see {@link InputContext#getTaskIndex} + * @param conf configuration instance + * @return task index + */ + @Public + public static int getTaskIndex(Configuration conf) { + return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_INDEX); + } + + /** + * @see {@link InputContext#getTaskAttemptNumber} + * @param conf configuration instance + * @return task attempt index + */ + @Public + public static int getTaskAttemptIndex(Configuration conf) { + return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX); + } + + /** + * @see {@link InputContext#getInputIndex} + * @param conf configuration instance + * @return input index + */ + @Public + public static int getInputIndex(Configuration conf) { + return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_INDEX); + } + + /** + * @see {@link InputContext#getDAGName} + * @param conf configuration instance + * @return dag name + */ + @Public + public static String getDagName(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_NAME); + } + + /** + * @see {@link InputContext#getTaskVertexName} + * @param conf configuration instance + * @return vertex name + */ + @Public + public static String getVertexName(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_NAME); + } + + /** + * @see {@link InputContext#getSourceVertexName} + * @param conf configuration instance + * @return source name + */ + @Public + public static String getInputName(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_NAME); + } + + /** + * @see {@link InputContext#getApplicationId} + * @param conf configuration instance + * @return applicationId as a string + */ + @Public + public static String getApplicationIdString(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_APPLICATION_ID); + } + + /** + * @see {@link InputContext#getUniqueIdentifier} + * @param conf configuration instance + * @return unique identifier for the input + */ + @Public + public static String getUniqueIdentifier(Configuration conf) { + return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER); + } + + /** + * @see {@link InputContext#getDAGAttemptNumber} + * @param conf configuration instance + * @return attempt number + */ + @Public + public static int getDagAttemptNumber(Configuration conf) { + return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index b68d135..4a4ba86 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -82,7 +82,21 @@ import com.google.common.collect.Lists; */ @Public public class MRInput extends MRInputBase { - + + @Private public static final String TEZ_MAPREDUCE_DAG_INDEX = "tez.mapreduce.dag.index"; + @Private public static final String TEZ_MAPREDUCE_DAG_NAME = "tez.mapreduce.dag.name"; + @Private public static final String TEZ_MAPREDUCE_VERTEX_INDEX = "tez.mapreduce.vertex.index"; + @Private public static final String TEZ_MAPREDUCE_VERTEX_NAME = "tez.mapreduce.vertex.name"; + @Private public static final String TEZ_MAPREDUCE_TASK_INDEX = "tez.mapreduce.task.index"; + @Private public static final String TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX = "tez.mapreduce.task.attempt.index"; + @Private public static final String TEZ_MAPREDUCE_INPUT_INDEX = "tez.mapreduce.input.index"; + @Private public static final String TEZ_MAPREDUCE_INPUT_NAME = "tez.mapreduce.input.name"; + @Private public static final String TEZ_MAPREDUCE_APPLICATION_ID = "tez.mapreduce.application.id"; + @Private public static final String TEZ_MAPREDUCE_UNIQUE_IDENTIFIER = "tez.mapreduce.unique.identifier"; + @Private public static final String TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER = "tez.mapreduce.dag.attempt.number"; + + + /** * Helper class to configure {@link MRInput} * http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java index e4aa7e2..230f55e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java @@ -32,6 +32,7 @@ import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; @@ -96,6 +97,17 @@ public abstract class MRInputBase extends AbstractLogicalInput { taskAttemptId.toString()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); + jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_INDEX, getContext().getDagIdentifier()); + jobConf.setInt(MRInput.TEZ_MAPREDUCE_VERTEX_INDEX, getContext().getTaskVertexIndex()); + jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_INDEX, getContext().getTaskIndex()); + jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX, getContext().getTaskAttemptNumber()); + jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_NAME, getContext().getDAGName()); + jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_NAME, getContext().getTaskVertexName()); + jobConf.setInt(MRInput.TEZ_MAPREDUCE_INPUT_INDEX, getContext().getInputIndex()); + jobConf.set(MRInput.TEZ_MAPREDUCE_INPUT_NAME, getContext().getSourceVertexName()); + jobConf.set(MRInput.TEZ_MAPREDUCE_APPLICATION_ID, getContext().getApplicationId().toString()); + jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier()); + jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber()); this.inputRecordCounter = getContext().getCounters().findCounter( TaskCounter.INPUT_RECORDS_PROCESSED); http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java index 448b90c..b42ef25 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java @@ -14,6 +14,7 @@ package org.apache.tez.mapreduce.input; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -22,17 +23,28 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.junit.Test; public class TestMRInput { @@ -47,6 +59,10 @@ public class TestMRInput { ApplicationId applicationId = ApplicationId.newInstance(1000, 1); doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload(); doReturn(applicationId).when(inputContext).getApplicationId(); + doReturn("dagName").when(inputContext).getDAGName(); + doReturn("vertexName").when(inputContext).getTaskVertexName(); + doReturn("inputName").when(inputContext).getSourceVertexName(); + doReturn("uniqueIdentifier").when(inputContext).getUniqueIdentifier(); doReturn(1).when(inputContext).getTaskIndex(); doReturn(1).when(inputContext).getTaskAttemptNumber(); doReturn(new TezCounters()).when(inputContext).getCounters(); @@ -69,4 +85,139 @@ public class TestMRInput { assertTrue(e instanceof IllegalStateException); } } + + private static final String TEST_ATTRIBUTES_DAG_NAME = "dagName"; + private static final String TEST_ATTRIBUTES_VERTEX_NAME = "vertexName"; + private static final String TEST_ATTRIBUTES_INPUT_NAME = "inputName"; + private static final ApplicationId TEST_ATTRIBUTES_APPLICATION_ID = ApplicationId.newInstance(0, 0); + private static final String TEST_ATTRIBUTES_UNIQUE_IDENTIFIER = "uniqueId"; + private static final int TEST_ATTRIBUTES_DAG_INDEX = 1000; + private static final int TEST_ATTRIBUTES_VERTEX_INDEX = 2000; + private static final int TEST_ATTRIBUTES_TASK_INDEX = 3000; + private static final int TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX = 4000; + private static final int TEST_ATTRIBUTES_INPUT_INDEX = 5000; + private static final int TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER = 6000; + + @Test(timeout = 5000) + public void testAttributesInJobConf() throws Exception { + InputContext inputContext = mock(InputContext.class); + doReturn(TEST_ATTRIBUTES_DAG_INDEX).when(inputContext).getDagIdentifier(); + doReturn(TEST_ATTRIBUTES_VERTEX_INDEX).when(inputContext).getTaskVertexIndex(); + doReturn(TEST_ATTRIBUTES_TASK_INDEX).when(inputContext).getTaskIndex(); + doReturn(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX).when(inputContext).getTaskAttemptNumber(); + doReturn(TEST_ATTRIBUTES_INPUT_INDEX).when(inputContext).getInputIndex(); + doReturn(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER).when(inputContext).getDAGAttemptNumber(); + doReturn(TEST_ATTRIBUTES_DAG_NAME).when(inputContext).getDAGName(); + doReturn(TEST_ATTRIBUTES_VERTEX_NAME).when(inputContext).getTaskVertexName(); + doReturn(TEST_ATTRIBUTES_INPUT_NAME).when(inputContext).getSourceVertexName(); + doReturn(TEST_ATTRIBUTES_APPLICATION_ID).when(inputContext).getApplicationId(); + doReturn(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER).when(inputContext).getUniqueIdentifier(); + + + DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false), + TestInputFormat.class).groupSplits(false).build(); + + doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload(); + doReturn(new TezCounters()).when(inputContext).getCounters(); + + + MRInput mrInput = new MRInput(inputContext, 1); + mrInput.initialize(); + + MRRuntimeProtos.MRSplitProto splitProto = + MRRuntimeProtos.MRSplitProto.newBuilder().setSplitClassName(TestInputSplit.class.getName()) + .build(); + InputDataInformationEvent diEvent = InputDataInformationEvent + .createWithSerializedPayload(0, splitProto.toByteString().asReadOnlyByteBuffer()); + + List<Event> events = new LinkedList<>(); + events.add(diEvent); + mrInput.handleEvents(events); + assertTrue(TestInputFormat.invoked.get()); + } + + + /** + * Test class to verify + */ + static class TestInputFormat implements InputFormat { + + private static final AtomicBoolean invoked = new AtomicBoolean(false); + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return null; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws + IOException { + assertEquals(TEST_ATTRIBUTES_DAG_NAME, MRInputHelpers.getDagName(job)); + assertEquals(TEST_ATTRIBUTES_VERTEX_NAME, MRInputHelpers.getVertexName(job)); + assertEquals(TEST_ATTRIBUTES_INPUT_NAME, MRInputHelpers.getInputName(job)); + assertEquals(TEST_ATTRIBUTES_DAG_INDEX, MRInputHelpers.getDagIndex(job)); + assertEquals(TEST_ATTRIBUTES_VERTEX_INDEX, MRInputHelpers.getVertexIndex(job)); + assertEquals(TEST_ATTRIBUTES_APPLICATION_ID.toString(), MRInputHelpers.getApplicationIdString(job)); + assertEquals(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER, MRInputHelpers.getUniqueIdentifier(job)); + assertEquals(TEST_ATTRIBUTES_TASK_INDEX, MRInputHelpers.getTaskIndex(job)); + assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX, MRInputHelpers.getTaskAttemptIndex(job)); + assertEquals(TEST_ATTRIBUTES_INPUT_INDEX, MRInputHelpers.getInputIndex(job)); + assertEquals(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER, MRInputHelpers.getDagAttemptNumber(job)); + invoked.set(true); + return new RecordReader() { + @Override + public boolean next(Object key, Object value) throws IOException { + return false; + } + + @Override + public Object createKey() { + return null; + } + + @Override + public Object createValue() { + return null; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + }; + } + } + + public static class TestInputSplit implements InputSplit { + + @Override + public long getLength() throws IOException { + return 0; + } + + @Override + public String[] getLocations() throws IOException { + return new String[0]; + } + + @Override + public void write(DataOutput out) throws IOException { + + } + + @Override + public void readFields(DataInput in) throws IOException { + + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/99c85d3f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index db5643e..1733bfc 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -33,6 +33,7 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -312,6 +313,7 @@ public class TestMultiMRInput { doReturn(1).when(inputContext).getTaskAttemptNumber(); doReturn(1).when(inputContext).getTaskIndex(); doReturn(1).when(inputContext).getTaskVertexIndex(); + doReturn(UUID.randomUUID().toString()).when(inputContext).getUniqueIdentifier(); doReturn("taskVertexName").when(inputContext).getTaskVertexName(); doReturn(UserPayload.create(ByteBuffer.wrap(payload))).when(inputContext).getUserPayload(); return inputContext;
