Repository: crunch Updated Branches: refs/heads/master dbd56e638 -> fcf901cbb
CRUNCH-481. Support independent output committers for multiple outputs. Re-added after this was inadvertantly dropped, and updated to fix null job ID. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fcf901cb Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fcf901cb Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fcf901cb Branch: refs/heads/master Commit: fcf901cbb437faf65cae6d1fb4c3513084fe4186 Parents: dbd56e6 Author: Tom White <[email protected]> Authored: Fri Feb 13 14:48:38 2015 +0000 Committer: Tom White <[email protected]> Committed: Fri Feb 13 14:48:38 2015 +0000 ---------------------------------------------------------------------- .../crunch/impl/mr/plan/JobPrototype.java | 2 + .../crunch/impl/mr/run/CrunchOutputFormat.java | 54 ++++ .../org/apache/crunch/io/CrunchOutputs.java | 292 ++++++++++++++++--- 3 files changed, 306 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index d341184..2863e00 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -34,6 +34,7 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks; import org.apache.crunch.impl.mr.run.CrunchCombiner; import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.impl.mr.run.CrunchOutputFormat; import org.apache.crunch.impl.mr.run.CrunchReducer; import org.apache.crunch.impl.mr.run.NodeContext; import org.apache.crunch.impl.mr.run.RTNode; @@ -214,6 +215,7 @@ class JobPrototype { job.setNumReduceTasks(0); inputNodes = Lists.newArrayList(outputNodes); } + job.setOutputFormatClass(CrunchOutputFormat.class); serialize(inputNodes, conf, workingPath, NodeContext.MAP); if (inputNodes.size() == 1) { http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java new file mode 100644 index 0000000..bd9cdc9 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import org.apache.crunch.io.CrunchOutputs; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> { + @Override + public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new RecordWriter<K, V>() { + @Override + public void write(K k, V v) throws IOException, InterruptedException { + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + } + }; + } + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + CrunchOutputs.checkOutputSpecs(jobContext); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return CrunchOutputs.getOutputCommitter(taskAttemptContext); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index e811bcf..57fe139 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -17,14 +17,24 @@ */ package org.apache.crunch.io; +import com.google.common.collect.Sets; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Joiner; @@ -35,6 +45,8 @@ import com.google.common.collect.Maps; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.UUID; /** * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances @@ -64,6 +76,31 @@ public class CrunchOutputs<K, V> { conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs); } + public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException { + Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration()); + for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) { + String namedOutput = e.getKey(); + Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration()); + OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); + fmt.checkOutputSpecs(jc); + } + } + + public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException { + Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration()); + Map<String, OutputCommitter> committers = Maps.newHashMap(); + for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) { + String namedOutput = e.getKey(); + Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration()); + OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); + TaskAttemptContext taskContext = TaskAttemptContextFactory.create( + job.getConfiguration(), tac.getTaskAttemptID()); + OutputCommitter oc = fmt.getOutputCommitter(taskContext); + committers.put(namedOutput, oc); + } + return new CompositeOutputCommitter(outputs, committers); + } + public static class OutputConfig<K, V> { public FormatBundle<OutputFormat<K, V>> bundle; public Class<K> keyClass; @@ -84,6 +121,10 @@ public class CrunchOutputs<K, V> { public static Map<String, OutputConfig> getNamedOutputs(Configuration conf) { Map<String, OutputConfig> out = Maps.newHashMap(); + String serOut = conf.get(CRUNCH_OUTPUTS); + if (serOut == null || serOut.isEmpty()) { + return out; + } for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) { List<String> fields = Lists.newArrayList(SPLITTER.split(input)); String name = fields.get(0); @@ -101,10 +142,10 @@ public class CrunchOutputs<K, V> { private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename"; private static final String COUNTERS_GROUP = CrunchOutputs.class.getName(); - private final TaskInputOutputContext<?, ?, K, V> baseContext; + private TaskInputOutputContext<?, ?, K, V> baseContext; + private Configuration baseConf; private final Map<String, OutputConfig> namedOutputs; - private final Map<String, RecordWriter<K, V>> recordWriters; - private final Map<String, TaskAttemptContext> taskContextCache; + private final Map<String, OutputState<K, V>> outputStates; private final boolean disableOutputCounters; /** @@ -114,11 +155,15 @@ public class CrunchOutputs<K, V> { * @param context the TaskInputOutputContext object */ public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) { + this(context.getConfiguration()); this.baseContext = context; - namedOutputs = getNamedOutputs(context); - recordWriters = Maps.newHashMap(); - taskContextCache = Maps.newHashMap(); - this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); + } + + public CrunchOutputs(Configuration conf) { + this.baseConf = conf; + this.namedOutputs = getNamedOutputs(conf); + this.outputStates = Maps.newHashMap(); + this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); } @SuppressWarnings("unchecked") @@ -128,63 +173,226 @@ public class CrunchOutputs<K, V> { throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'"); } - TaskAttemptContext taskContext = getContext(namedOutput); if (!disableOutputCounters) { baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1); } - getRecordWriter(taskContext, namedOutput).write(key, value); + getOutputState(namedOutput).write(key, value); } public void close() throws IOException, InterruptedException { - for (RecordWriter<?, ?> writer : recordWriters.values()) { - writer.close(baseContext); + for (OutputState<?, ?> out : outputStates.values()) { + out.close(); } } - - private TaskAttemptContext getContext(String nameOutput) throws IOException { - TaskAttemptContext taskContext = taskContextCache.get(nameOutput); - if (taskContext != null) { - return taskContext; + + private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException { + OutputState<?, ?> out = outputStates.get(namedOutput); + if (out != null) { + return (OutputState<K, V>) out; } // The following trick leverages the instantiation of a record writer via // the job thus supporting arbitrary output formats. - OutputConfig outConfig = namedOutputs.get(nameOutput); - Configuration conf = new Configuration(baseContext.getConfiguration()); - Job job = new Job(conf); - job.getConfiguration().set("crunch.namedoutput", nameOutput); + Job job = getJob(baseContext.getJobID(), namedOutput, baseConf); + + // Get a job with the expected named output. + job = getJob(job.getJobID(), namedOutput,baseConf); + + OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput)); + TaskAttemptContext taskContext = null; + RecordWriter<K, V> recordWriter = null; + + if (baseContext != null) { + taskContext = getTaskContext(baseContext, job); + + recordWriter = fmt.getRecordWriter(taskContext); + } + OutputState<K, V> outputState = new OutputState(taskContext, recordWriter); + this.outputStates.put(namedOutput, outputState); + return outputState; + } + + private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf) + throws IOException { + Job job = new Job(new Configuration(baseConf)); + job.getConfiguration().set("crunch.namedoutput", namedOutput); + setJobID(job, jobID, namedOutput); + return job; + } + + private static TaskAttemptContext getTaskContext(TaskAttemptContext baseContext, Job job) { + + org.apache.hadoop.mapreduce.TaskAttemptID baseTaskId = baseContext.getTaskAttemptID(); + + // Create a task ID context with our specialized job ID. + org.apache.hadoop.mapreduce.TaskAttemptID taskId; + taskId = new org.apache.hadoop.mapreduce.TaskAttemptID(job.getJobID().getJtIdentifier(), + job.getJobID().getId(), + baseTaskId.isMap(), + baseTaskId.getTaskID().getId(), + baseTaskId.getId()); + + return TaskAttemptContextFactory.create( + job.getConfiguration(), taskId); + } + + private static void setJobID(Job job, JobID jobID, String namedOutput) { + Method setJobIDMethod; + JobID newJobID = jobID; + try { + // Hadoop 2 + setJobIDMethod = Job.class.getMethod("setJobID", JobID.class); + // Add the named output to the job ID, since that is used by some output formats + // to create temporary outputs. + newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ? + jobID : + new JobID(jobID.getJtIdentifier() + "_" + namedOutput, jobID.getId()); + } catch (NoSuchMethodException e) { + // Hadoop 1's setJobID method is package private and declared by JobContext + try { + setJobIDMethod = JobContext.class.getDeclaredMethod("setJobID", JobID.class); + } catch (NoSuchMethodException e1) { + throw new CrunchRuntimeException(e); + } + setJobIDMethod.setAccessible(true); + } + try { + setJobIDMethod.invoke(job, newJobID); + } catch (Exception e) { + throw new CrunchRuntimeException("Could not set job ID to " + jobID, e); + } + } + + private static void configureJob( + String namedOutput, + Job job, + OutputConfig outConfig) throws IOException { + job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput); job.setOutputFormatClass(outConfig.bundle.getFormatClass()); job.setOutputKeyClass(outConfig.keyClass); job.setOutputValueClass(outConfig.valueClass); outConfig.bundle.configure(job.getConfiguration()); - taskContext = TaskAttemptContextFactory.create( - job.getConfiguration(), baseContext.getTaskAttemptID()); + } - taskContextCache.put(nameOutput, taskContext); - return taskContext; + private static OutputFormat getOutputFormat( + String namedOutput, + Job job, + OutputConfig outConfig) throws IOException { + configureJob(namedOutput, job, outConfig); + try { + return ReflectionUtils.newInstance( + job.getOutputFormatClass(), + job.getConfiguration()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } } - private synchronized RecordWriter<K, V> getRecordWriter( - TaskAttemptContext taskContext, String namedOutput) - throws IOException, InterruptedException { - // look for record-writer in the cache - RecordWriter<K, V> writer = recordWriters.get(namedOutput); + private static class OutputState<K, V> { + private final TaskAttemptContext context; + private final RecordWriter<K, V> recordWriter; - // If not in cache, create a new one - if (writer == null) { - // get the record writer from context output format - taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput); - try { - OutputFormat format = ReflectionUtils.newInstance( - taskContext.getOutputFormatClass(), - taskContext.getConfiguration()); - writer = format.getRecordWriter(taskContext); - } catch (ClassNotFoundException e) { - throw new IOException(e); + public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) { + this.context = context; + this.recordWriter = recordWriter; + } + + public void write(K key, V value) throws IOException, InterruptedException { + recordWriter.write(key, value); + } + + public void close() throws IOException, InterruptedException { + recordWriter.close(context); + } + } + + private static class CompositeOutputCommitter extends OutputCommitter { + + private final Map<String, OutputConfig> outputs; + private final Map<String, OutputCommitter> committers; + + public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) { + this.outputs = outputs; + this.committers = committers; + } + + private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException { + Job job = getJob(baseContext.getJobID(), namedOutput, baseContext.getConfiguration()); + configureJob(namedOutput, job, outputs.get(namedOutput)); + + return getTaskContext(baseContext, job); + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + Configuration conf = jobContext.getConfiguration(); + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + Job job = getJob(jobContext.getJobID(), e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); + e.getValue().setupJob(job); + } + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext)); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) { + return true; + } + } + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + + e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext)); } - recordWriters.put(namedOutput, writer); } - return writer; + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext)); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + Configuration conf = jobContext.getConfiguration(); + Set<Path> handledPaths = Sets.newHashSet(); + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + OutputCommitter oc = e.getValue(); + if (oc instanceof FileOutputCommitter) { + Path workPath = ((FileOutputCommitter) oc).getWorkPath(); + if (handledPaths.contains(workPath)) { + continue; + } else { + handledPaths.add(workPath); + } + } + Job job = getJob(jobContext.getJobID(), e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); + oc.commitJob(job); + } + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + Configuration conf = jobContext.getConfiguration(); + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + Job job = getJob(jobContext.getJobID(), e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); + e.getValue().abortJob(job, state); + } + } } }
