Repository: tez Updated Branches: refs/heads/master 8ade35c8d -> b6ce703c6
TEZ-2293. When running in "mr" mode, always use MR config settings. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6ce703c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6ce703c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6ce703c Branch: refs/heads/master Commit: b6ce703c67e07664dc4e4de300dccecc3717c5cc Parents: 8ade35c Author: Hitesh Shah <[email protected]> Authored: Wed Apr 8 15:10:13 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Wed Apr 8 15:10:13 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/mapreduce/client/YARNRunner.java | 2 +- .../apache/tez/mapreduce/hadoop/MRHelpers.java | 28 ++++++++++++++++---- .../tez/mapreduce/hadoop/TestMRHelpers.java | 24 +++++++++++++++++ .../tez/mapreduce/examples/MRRSleepJob.java | 22 ++++++++------- 5 files changed, 61 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b6ce703c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2f2a790..f660feb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2293. When running in "mr" mode, always use MR config settings. TEZ-2273. Tez UI: Support client side searching & sorting for dag tasks page TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir TEZ-2223. TestMockDAGAppMaster fails due to TEZ-2210 on mac. http://git-wip-us.apache.org/repos/asf/tez/blob/b6ce703c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 9750baf..8d2cfd5 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -573,7 +573,7 @@ public class YARNRunner implements ClientProtocol { // Transform all confs to use Tez keys for (int i = 0; i < stageConfs.length; i++) { - MRHelpers.translateMRConfToTez(stageConfs[i]); + MRHelpers.translateMRConfToTez(stageConfs[i], false); } // create inputs to tezClient.submit() http://git-wip-us.apache.org/repos/asf/tez/blob/b6ce703c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java index 6190628..ed021fb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java @@ -59,10 +59,24 @@ public class MRHelpers { * @param conf mr based configuration to be translated to tez */ public static void translateMRConfToTez(Configuration conf) { - convertVertexConfToTez(conf); + translateMRConfToTez(conf, true); } /** + * Translate MapReduce configuration keys to the equivalent Tez keys in the provided + * configuration. The translation is done in place. </p> + * This method is meant to be used by frameworks which rely upon existing MapReduce configuration + * instead of setting up their own. + * + * @param conf mr based configuration to be translated to tez + * @param preferTez If the tez setting already exists and is set, use the Tez setting + */ + public static void translateMRConfToTez(Configuration conf, boolean preferTez) { + convertVertexConfToTez(conf, preferTez); + } + + + /** * Update the provided configuration to use the new API (mapreduce) or the old API (mapred) based * on the configured InputFormat, OutputFormat, Partitioner etc. Also ensures that keys not * required by a particular mode are not present. </p> @@ -90,9 +104,9 @@ public class MRHelpers { } } - private static void convertVertexConfToTez(Configuration vertexConf) { + private static void convertVertexConfToTez(Configuration vertexConf, boolean preferTez) { setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown"); - processDirectConversion(vertexConf); + processDirectConversion(vertexConf, preferTez); setupMRComponents(vertexConf); } @@ -162,7 +176,7 @@ public class MRHelpers { } } - private static void processDirectConversion(Configuration conf) { + private static void processDirectConversion(Configuration conf, boolean preferTez) { for (Map.Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) { if (conf.get(dep.getKey()) != null) { // TODO Deprecation reason does not seem to reflect in the config ? @@ -173,10 +187,14 @@ public class MRHelpers { conf.unset(dep.getKey()); if (tezValue == null) { conf.set(dep.getValue(), mrValue, "TRANSLATED_TO_TEZ"); + } else if (!preferTez) { + conf.set(dep.getValue(), mrValue, "TRANSLATED_TO_TEZ_AND_MR_OVERRIDE"); } if (LOG.isDebugEnabled()) { LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value=" - + mrValue + ", tez:" + dep.getValue() + "=" + conf.get(dep.getValue())); + + mrValue + + ", tez(original):" + dep.getValue() + "=" + tezValue + + ", tez(final):" + dep.getValue() + "=" + conf.get(dep.getValue())); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/b6ce703c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java index b7d22b1..9766cc0 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java @@ -29,6 +29,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.Assert; import org.junit.Test; @@ -189,4 +190,27 @@ public class TestMRHelpers { Assert.assertEquals("foo2", env.get("user")); Assert.assertEquals(("bar" + File.pathSeparator + "bar2"), env.get("foo")); } + + @Test(timeout = 5000) + public void testTranslateMRConfToTez() { + Configuration conf = new Configuration(false); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1000); + conf.setLong(org.apache.tez.mapreduce.hadoop.MRJobConfig.IO_SORT_MB, 500); + + Configuration conf1 = new Configuration(conf); + MRHelpers.translateMRConfToTez(conf1); + Assert.assertNull(conf1.get(org.apache.tez.mapreduce.hadoop.MRJobConfig.IO_SORT_MB)); + Assert.assertEquals(1000, conf1.getLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 0)); + + Configuration conf2 = new Configuration(conf); + MRHelpers.translateMRConfToTez(conf2, true); + Assert.assertNull(conf2.get(org.apache.tez.mapreduce.hadoop.MRJobConfig.IO_SORT_MB)); + Assert.assertEquals(1000, conf2.getLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 0)); + + Configuration conf3 = new Configuration(conf); + MRHelpers.translateMRConfToTez(conf3, false); + Assert.assertNull(conf3.get(org.apache.tez.mapreduce.hadoop.MRJobConfig.IO_SORT_MB)); + Assert.assertEquals(500, conf3.getLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 0)); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/b6ce703c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index dc5847c..7204cb2 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -429,7 +429,7 @@ public class MRRSleepJob extends Configured implements Tool { NullOutputFormat.class.getName()); } - MRHelpers.translateMRConfToTez(mapStageConf); + MRHelpers.translateMRConfToTez(mapStageConf, false); Configuration[] intermediateReduceStageConfs = null; if (iReduceStagesCount > 0 @@ -450,7 +450,7 @@ public class MRRSleepJob extends Configured implements Tool { MRRSleepJobPartitioner.class.getName()); - MRHelpers.translateMRConfToTez(iReduceStageConf); + MRHelpers.translateMRConfToTez(iReduceStageConf, false); intermediateReduceStageConfs[i-1] = iReduceStageConf; } } @@ -469,7 +469,7 @@ public class MRRSleepJob extends Configured implements Tool { finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, NullOutputFormat.class.getName()); - MRHelpers.translateMRConfToTez(finalReduceConf); + MRHelpers.translateMRConfToTez(finalReduceConf, false); } MRHelpers.configureMRApiUsage(mapStageConf); @@ -573,16 +573,18 @@ public class MRRSleepJob extends Configured implements Tool { } - Map<String, String> partitionerConf = Maps.newHashMap(); - partitionerConf.put(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName()); - OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig - .newBuilder(IntWritable.class.getName(), IntWritable.class.getName(), - HashPartitioner.class.getName(), partitionerConf).configureInput().useLegacyInput() - .done().build(); - for (int i = 0; i < vertices.size(); ++i) { dag.addVertex(vertices.get(i)); if (i != 0) { + Map<String, String> partitionerConf = Maps.newHashMap(); + partitionerConf.put( + MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName()); + Configuration edgeConfiguration = ((i+1) == vertices.size()) ? + finalReduceConf : intermediateReduceStageConfs[i]; + OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig + .newBuilder(IntWritable.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName(), partitionerConf).configureInput().useLegacyInput() + .done().setFromConfiguration(edgeConfiguration).build(); dag.addEdge( Edge.create(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty())); }
