Repository: crunch Updated Branches: refs/heads/master 04e02516a -> 8c35a1399
CRUNCH-502: Fix interface/context differences in the CrunchOutputFormat wrapper code Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8c35a139 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8c35a139 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8c35a139 Branch: refs/heads/master Commit: 8c35a13996eba01671ce7c7c6f369b180bdd408b Parents: 04e0251 Author: Josh Wills <[email protected]> Authored: Mon Mar 2 14:35:02 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Mar 3 13:16:08 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/io/CrunchOutputs.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8c35a139/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index 57fe139..0d06931 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -18,13 +18,11 @@ package org.apache.crunch.io; import com.google.common.collect.Sets; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -46,7 +44,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; /** * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances @@ -82,7 +79,7 @@ public class CrunchOutputs<K, V> { String namedOutput = e.getKey(); Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration()); OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); - fmt.checkOutputSpecs(jc); + fmt.checkOutputSpecs(job); } } @@ -93,8 +90,7 @@ public class CrunchOutputs<K, V> { String namedOutput = e.getKey(); Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration()); OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); - TaskAttemptContext taskContext = TaskAttemptContextFactory.create( - job.getConfiguration(), tac.getTaskAttemptID()); + TaskAttemptContext taskContext = getTaskContext(tac, job); OutputCommitter oc = fmt.getOutputCommitter(taskContext); committers.put(namedOutput, oc); } @@ -204,7 +200,6 @@ public class CrunchOutputs<K, V> { if (baseContext != null) { taskContext = getTaskContext(baseContext, job); - recordWriter = fmt.getRecordWriter(taskContext); } OutputState<K, V> outputState = new OutputState(taskContext, recordWriter); @@ -371,16 +366,16 @@ public class CrunchOutputs<K, V> { Set<Path> handledPaths = Sets.newHashSet(); for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { OutputCommitter oc = e.getValue(); + Job job = getJob(jobContext.getJobID(), e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); if (oc instanceof FileOutputCommitter) { - Path workPath = ((FileOutputCommitter) oc).getWorkPath(); - if (handledPaths.contains(workPath)) { + Path outputPath = ((FileOutputCommitter) oc).getWorkPath().getParent(); + if (handledPaths.contains(outputPath)) { continue; } else { - handledPaths.add(workPath); + handledPaths.add(outputPath); } } - Job job = getJob(jobContext.getJobID(), e.getKey(), conf); - configureJob(e.getKey(), job, outputs.get(e.getKey())); oc.commitJob(job); } }
