Repository: tez Updated Branches: refs/heads/branch-0.9 c3c450ad9 -> 7b145c827
TEZ-3824. MRCombiner creates new JobConf copy per spill (Jonathan Eagles via jlowe) (cherry picked from commit 1ae62421aa6c3d9131e1673615b1c94c8e7011aa) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b145c82 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b145c82 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b145c82 Branch: refs/heads/branch-0.9 Commit: 7b145c827a54c7857309d01282516c4d782f29a5 Parents: c3c450a Author: Jason Lowe <[email protected]> Authored: Mon May 14 13:17:43 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Mon May 14 13:19:26 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/mapreduce/combine/MRCombiner.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7b145c82/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java index 9514215..adfd24d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java @@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.combine; import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -78,7 +79,13 @@ public class MRCombiner implements Combiner { private final TaskAttemptID mrTaskAttemptID; public MRCombiner(TaskContext taskContext) throws IOException { - this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload()); + final Configuration userConf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload()); + useNewApi = ConfigUtils.useNewApi(userConf); + if (useNewApi) { + conf = new JobConf(userConf); + } else { + conf = userConf; + } assert(taskContext instanceof InputContext || taskContext instanceof OutputContext); if (taskContext instanceof OutputContext) { @@ -93,8 +100,6 @@ public class MRCombiner implements Combiner { this.reporter = new MRTaskReporter((InputContext)taskContext); } - this.useNewApi = ConfigUtils.useNewApi(conf); - combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
