Repository: tez Updated Branches: refs/heads/master fa72bdc7f -> beb77856e
TEZ-2033. Update TestOrderedWordCount to add processor configs as history text and use MR configs correctly. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/beb77856 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/beb77856 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/beb77856 Branch: refs/heads/master Commit: beb77856eb6eadf11001df28975f30ed8752c7f0 Parents: fa72bdc Author: Hitesh Shah <[email protected]> Authored: Tue Apr 14 11:31:17 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Apr 14 11:31:17 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../examples/TestOrderedWordCount.java | 35 ++++++++++---------- 2 files changed, 20 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/beb77856/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b47cf7d..cdc48e1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2033. Update TestOrderedWordCount to add processor configs as history text + and use MR configs correctly TEZ-2318. Tez UI: source and sink page is broken as they are not populated. TEZ-2016. Tez UI: Dag View Fit and Finish TEZ-2252. Tez UI: in graphical view some of the sinks are hidden as they overlap http://git-wip-us.apache.org/repos/asf/tez/blob/beb77856/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index 92e3a53..4a05dab 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -162,8 +162,7 @@ public class TestOrderedWordCount extends Configured implements Tool { Configuration mapStageConf = new JobConf(conf); mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, TokenizerMapper.class.getName()); - - MRHelpers.translateMRConfToTez(mapStageConf); + MRHelpers.translateMRConfToTez(mapStageConf, !useMRSettings); Configuration iReduceStageConf = new JobConf(conf); // TODO replace with auto-reduce parallelism @@ -174,7 +173,7 @@ public class TestOrderedWordCount extends Configured implements Tool { iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); iReduceStageConf.setBoolean("mapred.mapper.new-api", true); - MRHelpers.translateMRConfToTez(iReduceStageConf); + MRHelpers.translateMRConfToTez(iReduceStageConf, !useMRSettings); Configuration finalReduceConf = new JobConf(conf); finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1); @@ -182,7 +181,7 @@ public class TestOrderedWordCount extends Configured implements Tool { MyOrderByNoOpReducer.class.getName()); finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); - MRHelpers.translateMRConfToTez(finalReduceConf); + MRHelpers.translateMRConfToTez(finalReduceConf, !useMRSettings); MRHelpers.configureMRApiUsage(mapStageConf); MRHelpers.configureMRApiUsage(iReduceStageConf); @@ -190,19 +189,22 @@ public class TestOrderedWordCount extends Configured implements Tool { List<Vertex> vertices = new ArrayList<Vertex>(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096); - mapStageConf.writeXml(outputStream); - String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8"); + String mapStageHistoryText = TezUtils.convertToHistoryText("Initial Tokenizer Vertex", + mapStageConf); DataSourceDescriptor dsd; if (generateSplitsInClient) { mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class.getName()); mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath); mapStageConf.setBoolean("mapred.mapper.new-api", true); - dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true); + dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, + true); } else { - dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build(); + dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, + inputPath).build(); } + dsd.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText( + "HDFS Input " + inputPath, mapStageConf)); Map<String, String> mapEnv = Maps.newHashMap(); MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, mapEnv, true); @@ -227,10 +229,8 @@ public class TestOrderedWordCount extends Configured implements Tool { .addDataSource("MRInput", dsd); vertices.add(mapVertex); - ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096); - iReduceStageConf.writeXml(iROutputStream); - String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8"); - + String iReduceStageHistoryText = TezUtils.convertToHistoryText("Intermediate Summation Vertex", + iReduceStageConf); ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create( ReduceProcessor.class.getName()) .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf)) @@ -249,9 +249,8 @@ public class TestOrderedWordCount extends Configured implements Tool { intermediateVertex.addTaskLocalFiles(commonLocalResources); vertices.add(intermediateVertex); - ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096); - finalReduceConf.writeXml(finalReduceOutputStream); - String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8"); + String finalReduceStageHistoryText = TezUtils.convertToHistoryText("Final Sorter Vertex", + finalReduceConf); UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf); Vertex finalReduceVertex; @@ -272,6 +271,8 @@ public class TestOrderedWordCount extends Configured implements Tool { finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath) .build()); + finalReduceVertex.getDataSinks().get(0).getOutputDescriptor().setHistoryText( + TezUtils.convertToHistoryText("HDFS Output " + outputPath, finalReduceConf)); vertices.add(finalReduceVertex); DAG dag = DAG.create("OrderedWordCount" + dagIndex); @@ -546,7 +547,7 @@ public class TestOrderedWordCount extends Configured implements Tool { } public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new TestOrderedWordCount(), args); + int res = ToolRunner.run(new TezConfiguration(), new TestOrderedWordCount(), args); System.exit(res); } }
