Repository: tez
Updated Branches:
  refs/heads/branch-0.8 ef5bd642f -> aeae7c97b


TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for 
large dag plans. (Sushmitha Sreenivasan via hitesh)

(cherry picked from commit 670691c352d6e80400d40f1bdb9f5b87368b8f9f)

Conflicts:
        CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aeae7c97
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aeae7c97
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aeae7c97

Branch: refs/heads/branch-0.8
Commit: aeae7c97b7d35dba72b6f843c3df462efa002f13
Parents: ef5bd64
Author: Hitesh Shah <[email protected]>
Authored: Wed Jul 13 14:21:29 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Wed Jul 13 14:25:56 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../examples/TestOrderedWordCount.java          | 72 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/aeae7c97/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 885abbc..9e8c0e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for 
large dag plans.
   TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid 
confusion.
   TEZ-1248. Reduce slow-start should special case 1 reducer runs.
 

http://git-wip-us.apache.org/repos/asf/tez/blob/aeae7c97/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 6ed6d2d..5f5fbfc 100644
--- 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -29,8 +29,10 @@ import java.util.StringTokenizer;
 import java.util.TreeMap;
 
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -102,7 +104,18 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
 
   private static final String DAG_VIEW_ACLS = 
"tez.testorderedwordcount.view-acls";
   private static final String DAG_MODIFY_ACLS = 
"tez.testorderedwordcount.modify-acls";
-
+  /**
+   * IS_MAX_IPC_DATA_SET_BY_USER is a boolean value which is set to true when 
MAX_IPC_DATA_LENGTH is set by user
+   * use -Dtez.testorderedwordcount.ipc.maximum.data.length to set the maximum 
IPC Data limit in MB
+   * use -Dtez.testorderedwordcount.exceed.ipc.limit in MB to exceed the 
MAX_IPC_DATA_LENGTH value
+   * IPC_PAYLOAD value is a random string generated for each vertex such that 
MAX_IPC_DATA_LENGTH is violated
+   * NO_OF_VERTICES is the total number of vertices in testOrderedWordCount dag
+   */
+  private static final String IS_MAX_IPC_DATA_SET_BY_USER = 
"tez.testorderedwordcount.is-max-ipc-set-by-user";
+  private static final String MAX_IPC_DATA_LENGTH = 
"tez.testorderedwordcount.ipc.maximum.data.length";
+  private static final String EXCEED_IPC_DATA_LIMIT = 
"tez.testorderedwordcount.exceed.ipc.limit";
+  private static final String IPC_PAYLOAD = 
"tez.testorderedwordcount.ipc.payload";
+  private static final int NO_OF_VERTICES = 3;
 
   public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
@@ -110,6 +123,18 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
 
+    public void setup(Context context) throws IOException, 
InterruptedException {
+      Configuration conf = context.getConfiguration();
+      if (conf.getBoolean(IS_MAX_IPC_DATA_SET_BY_USER, false)) {
+        LOG.info("Max IPC Data Length set : " + 
conf.getInt(MAX_IPC_DATA_LENGTH, -1) + " MB," +
+            " Exceed the Max IPC Data Length : " + 
conf.getInt(EXCEED_IPC_DATA_LIMIT, 3) + " MB," +
+            " Total Dag Payload sent through IPC : "
+            + (conf.getInt(MAX_IPC_DATA_LENGTH, -1) + 
conf.getInt(EXCEED_IPC_DATA_LIMIT, 3)) + " MB," +
+            " Each Vertex Processor payload : " +
+            ((conf.getInt(MAX_IPC_DATA_LENGTH, -1) + 
conf.getInt(EXCEED_IPC_DATA_LIMIT, 3))/NO_OF_VERTICES)+" MB");
+      }
+    }
+
     public void map(Object key, Text value, Context context
                     ) throws IOException, InterruptedException {
       StringTokenizer itr = new StringTokenizer(value.toString());
@@ -161,7 +186,9 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
       int dagIndex, String inputPath, String outputPath,
       boolean generateSplitsInClient,
       boolean useMRSettings,
-      int intermediateNumReduceTasks) throws Exception {
+      int intermediateNumReduceTasks,
+      int maxDataLengthThroughIPC,
+      int exceedDataLimit) throws Exception {
 
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -215,11 +242,14 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
     Map<String, String> reduceEnv = Maps.newHashMap();
     MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, reduceEnv, false);
 
+    Configuration copyMapStageConf = new Configuration(mapStageConf);
+    setMaxDataLengthConf(copyMapStageConf, maxDataLengthThroughIPC, 
exceedDataLimit);
+
     Vertex mapVertex;
     ProcessorDescriptor mapProcessorDescriptor =
         ProcessorDescriptor.create(MapProcessor.class.getName())
             .setUserPayload(
-                TezUtils.createUserPayloadFromConf(mapStageConf))
+                TezUtils.createUserPayloadFromConf(copyMapStageConf))
             .setHistoryText(mapStageHistoryText);
     if (!useMRSettings) {
       mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
@@ -233,11 +263,14 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
         .addDataSource("MRInput", dsd);
     vertices.add(mapVertex);
 
+    Configuration copyiReduceStageConf = new Configuration(iReduceStageConf);
+    setMaxDataLengthConf(copyiReduceStageConf, maxDataLengthThroughIPC, 
exceedDataLimit);
+
     String iReduceStageHistoryText = 
TezUtils.convertToHistoryText("Intermediate Summation Vertex",
         iReduceStageConf);
     ProcessorDescriptor iReduceProcessorDescriptor = 
ProcessorDescriptor.create(
         ReduceProcessor.class.getName())
-        .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+        
.setUserPayload(TezUtils.createUserPayloadFromConf(copyiReduceStageConf))
         .setHistoryText(iReduceStageHistoryText);
 
     Vertex intermediateVertex;
@@ -253,9 +286,12 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
     intermediateVertex.addTaskLocalFiles(commonLocalResources);
     vertices.add(intermediateVertex);
 
+    Configuration copyFinalReduceConf = new Configuration(finalReduceConf);
+    setMaxDataLengthConf(copyFinalReduceConf, maxDataLengthThroughIPC, 
exceedDataLimit);
+
     String finalReduceStageHistoryText = TezUtils.convertToHistoryText("Final 
Sorter Vertex",
         finalReduceConf);
-    UserPayload finalReducePayload = 
TezUtils.createUserPayloadFromConf(finalReduceConf);
+    UserPayload finalReducePayload = 
TezUtils.createUserPayloadFromConf(copyFinalReduceConf);
     Vertex finalReduceVertex;
 
     ProcessorDescriptor finalReduceProcessorDescriptor =
@@ -304,6 +340,24 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
 
     return dag;
   }
+  private void setMaxDataLengthConf(Configuration config, int 
maxDataLengthThroughIPC, int exceedDataLimit) {
+    /**
+     * if -Dtez.testorderedwordcount.ipc.maximum.data.length is set by user,
+     * this function sets necessary configurations as below:
+     * IS_MAX_IPC_DATA_SET_BY_USER is set to true
+     * EXCEED_IPC_DATA_LIMIT = <N> MB is used to test successful dag 
submission when MAX_IPC_DATA_LENGTH exceeds by N
+     * Each vertex processor payload can be set to IPC_PAYLOAD so that the 
cumulative dag payload exceeds
+     * the tez.testorderedwordcount.ipc.maximum.data.length set
+     */
+    if (maxDataLengthThroughIPC > 0) {
+      config.setBoolean(IS_MAX_IPC_DATA_SET_BY_USER, true);
+      config.setInt(EXCEED_IPC_DATA_LIMIT, exceedDataLimit);
+      int payloadSize;
+      payloadSize = (((maxDataLengthThroughIPC * 1024 * 1024) + 
(exceedDataLimit * 1024 * 1024)) / NO_OF_VERTICES);
+      String payload = RandomStringUtils.randomAlphanumeric(payloadSize);
+      config.set(IPC_PAYLOAD, payload);
+    }
+  }
 
   private void updateDAGACls(Configuration conf, DAG dag, int dagIndex) {
     LOG.info("Checking DAG specific ACLS");
@@ -360,6 +414,11 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
     boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true);
     // TODO needs to use auto reduce parallelism
     int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2);
+    int maxDataLengthThroughIPC = conf.getInt(MAX_IPC_DATA_LENGTH, -1);
+    int exceedDataLimit = conf.getInt(EXCEED_IPC_DATA_LIMIT, 3);
+    if (maxDataLengthThroughIPC > 0) {
+      conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 
maxDataLengthThroughIPC * 1024 * 1024);
+    }
 
     if (((otherArgs.length%2) != 0)
         || (!useTezSession && otherArgs.length != 2)) {
@@ -451,7 +510,8 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
         
         DAG dag = instance.createDAG(fs, tezConf, localResources,
             stagingDir, dagIndex, inputPath, outputPath,
-            generateSplitsInClient, useMRSettings, intermediateNumReduceTasks);
+            generateSplitsInClient, useMRSettings, intermediateNumReduceTasks,
+            maxDataLengthThroughIPC,exceedDataLimit);
         String callerType = "TestOrderedWordCount";
         String callerId = tezSession.getAppMasterApplicationId() == null ?
             ( "UnknownApp_" + System.currentTimeMillis() + dagIndex ) :

Reply via email to