Repository: tez Updated Branches: refs/heads/master e6be19695 -> 670691c35
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. (Sushmitha Sreenivasan via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/670691c3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/670691c3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/670691c3 Branch: refs/heads/master Commit: 670691c352d6e80400d40f1bdb9f5b87368b8f9f Parents: e6be196 Author: Hitesh Shah <[email protected]> Authored: Wed Jul 13 14:21:29 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Wed Jul 13 14:21:29 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../examples/TestOrderedWordCount.java | 72 ++++++++++++++++++-- 2 files changed, 67 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/670691c3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ff57f62..f2ed0ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion. TEZ-3303. Have ShuffleVertexManager consume more precise partition stats. TEZ-1248. Reduce slow-start should special case 1 reducer runs. http://git-wip-us.apache.org/repos/asf/tez/blob/670691c3/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index af455fb..51e4be1 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -29,8 +29,10 @@ import java.util.StringTokenizer; import java.util.TreeMap; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -102,7 +104,18 @@ public class TestOrderedWordCount extends Configured implements Tool { private static final String DAG_VIEW_ACLS = "tez.testorderedwordcount.view-acls"; private static final String DAG_MODIFY_ACLS = "tez.testorderedwordcount.modify-acls"; - + /** + * IS_MAX_IPC_DATA_SET_BY_USER is a boolean value which is set to true when MAX_IPC_DATA_LENGTH is set by user + * use -Dtez.testorderedwordcount.ipc.maximum.data.length to set the maximum IPC Data limit in MB + * use -Dtez.testorderedwordcount.exceed.ipc.limit in MB to exceed the MAX_IPC_DATA_LENGTH value + * IPC_PAYLOAD value is a random string generated for each vertex such that MAX_IPC_DATA_LENGTH is violated + * NO_OF_VERTICES is the total number of vertices in testOrderedWordCount dag + */ + private static final String IS_MAX_IPC_DATA_SET_BY_USER = "tez.testorderedwordcount.is-max-ipc-set-by-user"; + private static final String MAX_IPC_DATA_LENGTH = "tez.testorderedwordcount.ipc.maximum.data.length"; + private static final String EXCEED_IPC_DATA_LIMIT = "tez.testorderedwordcount.exceed.ipc.limit"; + private static final String IPC_PAYLOAD = "tez.testorderedwordcount.ipc.payload"; + private static final int NO_OF_VERTICES = 3; public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ @@ -110,6 +123,18 @@ public class TestOrderedWordCount extends Configured implements Tool { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); + public void setup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + if (conf.getBoolean(IS_MAX_IPC_DATA_SET_BY_USER, false)) { + LOG.info("Max IPC Data Length set : " + conf.getInt(MAX_IPC_DATA_LENGTH, -1) + " MB," + + " Exceed the Max IPC Data Length : " + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3) + " MB," + + " Total Dag Payload sent through IPC : " + + (conf.getInt(MAX_IPC_DATA_LENGTH, -1) + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3)) + " MB," + + " Each Vertex Processor payload : " + + ((conf.getInt(MAX_IPC_DATA_LENGTH, -1) + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3))/NO_OF_VERTICES)+" MB"); + } + } + public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); @@ -161,7 +186,9 @@ public class TestOrderedWordCount extends Configured implements Tool { int dagIndex, String inputPath, String outputPath, boolean generateSplitsInClient, boolean useMRSettings, - int intermediateNumReduceTasks) throws Exception { + int intermediateNumReduceTasks, + int maxDataLengthThroughIPC, + int exceedDataLimit) throws Exception { Configuration mapStageConf = new JobConf(conf); mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, @@ -215,11 +242,14 @@ public class TestOrderedWordCount extends Configured implements Tool { Map<String, String> reduceEnv = Maps.newHashMap(); MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, reduceEnv, false); + Configuration copyMapStageConf = new Configuration(mapStageConf); + setMaxDataLengthConf(copyMapStageConf, maxDataLengthThroughIPC, exceedDataLimit); + Vertex mapVertex; ProcessorDescriptor mapProcessorDescriptor = ProcessorDescriptor.create(MapProcessor.class.getName()) .setUserPayload( - TezUtils.createUserPayloadFromConf(mapStageConf)) + TezUtils.createUserPayloadFromConf(copyMapStageConf)) .setHistoryText(mapStageHistoryText); if (!useMRSettings) { mapVertex = Vertex.create("initialmap", mapProcessorDescriptor); @@ -233,11 +263,14 @@ public class TestOrderedWordCount extends Configured implements Tool { .addDataSource("MRInput", dsd); vertices.add(mapVertex); + Configuration copyiReduceStageConf = new Configuration(iReduceStageConf); + setMaxDataLengthConf(copyiReduceStageConf, maxDataLengthThroughIPC, exceedDataLimit); + String iReduceStageHistoryText = TezUtils.convertToHistoryText("Intermediate Summation Vertex", iReduceStageConf); ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create( ReduceProcessor.class.getName()) - .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf)) + .setUserPayload(TezUtils.createUserPayloadFromConf(copyiReduceStageConf)) .setHistoryText(iReduceStageHistoryText); Vertex intermediateVertex; @@ -253,9 +286,12 @@ public class TestOrderedWordCount extends Configured implements Tool { intermediateVertex.addTaskLocalFiles(commonLocalResources); vertices.add(intermediateVertex); + Configuration copyFinalReduceConf = new Configuration(finalReduceConf); + setMaxDataLengthConf(copyFinalReduceConf, maxDataLengthThroughIPC, exceedDataLimit); + String finalReduceStageHistoryText = TezUtils.convertToHistoryText("Final Sorter Vertex", finalReduceConf); - UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf); + UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(copyFinalReduceConf); Vertex finalReduceVertex; ProcessorDescriptor finalReduceProcessorDescriptor = @@ -304,6 +340,24 @@ public class TestOrderedWordCount extends Configured implements Tool { return dag; } + private void setMaxDataLengthConf(Configuration config, int maxDataLengthThroughIPC, int exceedDataLimit) { + /** + * if -Dtez.testorderedwordcount.ipc.maximum.data.length is set by user, + * this function sets necessary configurations as below: + * IS_MAX_IPC_DATA_SET_BY_USER is set to true + * EXCEED_IPC_DATA_LIMIT = <N> MB is used to test successful dag submission when MAX_IPC_DATA_LENGTH exceeds by N + * Each vertex processor payload can be set to IPC_PAYLOAD so that the cumulative dag payload exceeds + * the tez.testorderedwordcount.ipc.maximum.data.length set + */ + if (maxDataLengthThroughIPC > 0) { + config.setBoolean(IS_MAX_IPC_DATA_SET_BY_USER, true); + config.setInt(EXCEED_IPC_DATA_LIMIT, exceedDataLimit); + int payloadSize; + payloadSize = (((maxDataLengthThroughIPC * 1024 * 1024) + (exceedDataLimit * 1024 * 1024)) / NO_OF_VERTICES); + String payload = RandomStringUtils.randomAlphanumeric(payloadSize); + config.set(IPC_PAYLOAD, payload); + } + } private void updateDAGACls(Configuration conf, DAG dag, int dagIndex) { LOG.info("Checking DAG specific ACLS"); @@ -360,6 +414,11 @@ public class TestOrderedWordCount extends Configured implements Tool { boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true); // TODO needs to use auto reduce parallelism int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2); + int maxDataLengthThroughIPC = conf.getInt(MAX_IPC_DATA_LENGTH, -1); + int exceedDataLimit = conf.getInt(EXCEED_IPC_DATA_LIMIT, 3); + if (maxDataLengthThroughIPC > 0) { + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxDataLengthThroughIPC * 1024 * 1024); + } if (((otherArgs.length%2) != 0) || (!useTezSession && otherArgs.length != 2)) { @@ -453,7 +512,8 @@ public class TestOrderedWordCount extends Configured implements Tool { DAG dag = instance.createDAG(fs, tezConf, localResources, stagingDir, dagIndex, inputPath, outputPath, - generateSplitsInClient, useMRSettings, intermediateNumReduceTasks); + generateSplitsInClient, useMRSettings, intermediateNumReduceTasks, + maxDataLengthThroughIPC,exceedDataLimit); String callerType = "TestOrderedWordCount"; String callerId = tezSession.getAppMasterApplicationId() == null ? ( "UnknownApp_" + System.currentTimeMillis() + dagIndex ) :
