Updated Branches: refs/heads/TEZ-1 9f7a21d36 -> 45ace5414
TEZ-102. Fix shuffle to read the correct comparator class from configuration . Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/45ace541 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/45ace541 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/45ace541 Branch: refs/heads/TEZ-1 Commit: 45ace5414295eb1a586adc2aa3153f5dfecbad0f Parents: 9f7a21d Author: Siddharth Seth <[email protected]> Authored: Thu May 2 16:12:49 2013 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu May 2 16:16:34 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/common/TezJobConfig.java | 5 +- .../org/apache/tez/engine/common/ConfigUtils.java | 41 ++---------- .../tez/mapreduce/hadoop/DeprecatedKeys.java | 2 + .../hadoop/MultiStageMRConfToTezTranslator.java | 49 ++++++++++++++- .../processor/reduce/ReduceProcessor.java | 5 +- 5 files changed, 63 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java index 72ba68f..e867458 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java @@ -263,7 +263,7 @@ public class TezJobConfig { "tez.engine.intermediate-output.key.comparator.class"; public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS = "tez.engine.intermediate-input.key.comparator.class"; - + public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS = "tez.engine.intermediate-output.key.class"; public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS = @@ -283,6 +283,9 @@ public class TezJobConfig { "tez.engine.intermediate-output.compress.codec"; public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC = "tez.engine.intermediate-input.compress.codec"; + + public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS = + "tez.engine.intermediate-input.key.secondary.comparator.class"; // TODO This should be in DAGConfiguration /* config for tracking the local file where all the credentials for the job http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java index e1f496d..f14bb3e 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java @@ -19,9 +19,7 @@ package org.apache.tez.engine.common; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.compress.CompressionCodec; @@ -79,14 +77,10 @@ public class ConfigUtils { TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false); } - // TODO Is it possible to simplify the 3-level lookup (Comparator, Map-key, Job-key) public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) { Class<V> retv = (Class<V>) conf.getClass( TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, null, Object.class); - if (retv == null) { - retv = getOutputValueClass(conf); - } return retv; } @@ -94,24 +88,13 @@ public class ConfigUtils { Class<V> retv = (Class<V>) conf.getClass( TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS, null, Object.class); - if (retv == null) { - retv = getOutputValueClass(conf); - } return retv; } - public static <V> Class<V> getOutputValueClass(Configuration conf) { - return (Class<V>) conf.getClass( - "mapreduce.job.output.value.class", Text.class, Object.class); - } - public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) { Class<K> retv = (Class<K>) conf.getClass( TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, null, Object.class); - if (retv == null) { - retv = getOutputKeyClass(conf); - } return retv; } @@ -119,20 +102,9 @@ public class ConfigUtils { Class<K> retv = (Class<K>) conf.getClass( TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS, null, Object.class); - if (retv == null) { - retv = getOutputKeyClass(conf); - } return retv; } - - public static <K> Class<K> getOutputKeyClass(Configuration conf) { - return - (Class<K>) - conf.getClass( - "mapreduce.job.output.key.class", - LongWritable.class, Object.class); -} - + public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) { Class<? extends RawComparator> theClass = conf.getClass( TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null, @@ -155,14 +127,15 @@ public class ConfigUtils { - public static <V> RawComparator<V> getOutputValueGroupingComparator( + // TODO Fix name + public static <V> RawComparator<V> getInputKeySecondaryGroupingComparator( Configuration conf) { - Class<? extends RawComparator> theClass = - conf.getClass( - "mapreduce.job.output.group.comparator.class", + Class<? extends RawComparator> theClass = conf + .getClass( + TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, null, RawComparator.class); if (theClass == null) { - return getIntermediateOutputKeyComparator(conf); + return getIntermediateInputKeyComparator(conf); } return ReflectionUtils.newInstance(theClass, conf); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java index 9ac723f..3add31c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java @@ -194,6 +194,8 @@ public class DeprecatedKeys { registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS); + registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS); + registerMRToEngineKeyTranslation(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE); registerMRToEngineKeyTranslation(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java index 162e225..c444314 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java @@ -21,11 +21,17 @@ package org.apache.tez.mapreduce.hadoop; import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.common.TezJobConfig; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys; public class MultiStageMRConfToTezTranslator { + private static final Log LOG = LogFactory.getLog(MultiStageMRConfToTezTranslator.class); + private enum DeprecationReason { DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE } @@ -49,10 +55,10 @@ public class MultiStageMRConfToTezTranslator { int numEdges = totalStages - 1; Configuration[] allConfs = extractStageConfs(newConf, numEdges); - + for (int i = 0; i < allConfs.length; i++) { + setStageKeysFromBaseConf(allConfs[i], srcConf, Integer.toString(i)); processDirectConversion(allConfs[i]); - // XXX How are the number of reducers being set correctly in YARNRUNNER ? } for (int i = 0; i < allConfs.length - 1; i++) { processMultiStageDepreaction(allConfs[i], allConfs[i + 1]); @@ -146,4 +152,41 @@ public class MultiStageMRConfToTezTranslator { } } } -} + + /** + * Pulls in specific keys from the base configuration, if they are not set at + * the stage level. An explicit list of keys is copied over (not all), which + * require translation to tez keys. + */ + private static void setStageKeysFromBaseConf(Configuration conf, + Configuration baseConf, String stage) { + JobConf jobConf = new JobConf(baseConf); + // Don't clobber explicit tez config. + if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS) == null + && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) { + // If this is set, but the comparator is not set, and their types differ - + // the job will break. + if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) { + // Pull tis in from the baseConf + conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, jobConf + .getMapOutputKeyClass().getName()); + LOG.info("XXX: Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS + + " for stage: " + stage + + " based on job level configuration. Value: " + + conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS)); + } + } + + if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS) == null + && conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) { + if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) { + conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf + .getMapOutputValueClass().getName()); + LOG.info("XXX: Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS + + " for stage: " + stage + + " based on job level configuration. Value: " + + conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45ace541/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index b74f952..9ae07ee 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -129,8 +129,11 @@ implements Processor { Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf); Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf); + LOG.info("Using keyClass: " + keyClass); + LOG.info("Using valueClass: " + valueClass); RawComparator comparator = - ConfigUtils.getOutputValueGroupingComparator(jobConf); + ConfigUtils.getInputKeySecondaryGroupingComparator(jobConf); + LOG.info("Using comparator: " + comparator); reduceInputKeyCounter = reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
