Repository: tez Updated Branches: refs/heads/master 60645a825 -> 1ae62421a
TEZ-3824. MRCombiner creates new JobConf copy per spill (Jonathan Eagles via jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1ae62421 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1ae62421 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1ae62421 Branch: refs/heads/master Commit: 1ae62421aa6c3d9131e1673615b1c94c8e7011aa Parents: 60645a8 Author: Jason Lowe <[email protected]> Authored: Mon May 14 13:17:43 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Mon May 14 13:17:43 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/mapreduce/combine/MRCombiner.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1ae62421/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java index 9514215..adfd24d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java @@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.combine; import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -78,7 +79,13 @@ public class MRCombiner implements Combiner { private final TaskAttemptID mrTaskAttemptID; public MRCombiner(TaskContext taskContext) throws IOException { - this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload()); + final Configuration userConf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload()); + useNewApi = ConfigUtils.useNewApi(userConf); + if (useNewApi) { + conf = new JobConf(userConf); + } else { + conf = userConf; + } assert(taskContext instanceof InputContext || taskContext instanceof OutputContext); if (taskContext instanceof OutputContext) { @@ -93,8 +100,6 @@ public class MRCombiner implements Combiner { this.reporter = new MRTaskReporter((InputContext)taskContext); } - this.useNewApi = ConfigUtils.useNewApi(conf); - combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
