Repository: crunch Updated Branches: refs/heads/master 2c7821fd3 -> d4f23c42c
CRUNCH-481: Support independent output committers for multiple outputs Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d4f23c42 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d4f23c42 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d4f23c42 Branch: refs/heads/master Commit: d4f23c42c6d215a07ecfea74b1b6cddbc1537eeb Parents: 2c7821f Author: Josh Wills <[email protected]> Authored: Sat Nov 29 16:21:15 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Sat Nov 29 16:21:34 2014 -0800 ---------------------------------------------------------------------- .../crunch/impl/mr/plan/JobPrototype.java | 2 + .../crunch/impl/mr/run/CrunchOutputFormat.java | 54 ++++ .../org/apache/crunch/io/CrunchOutputs.java | 259 +++++++++++++++---- 3 files changed, 260 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index d341184..2863e00 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -34,6 +34,7 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks; import org.apache.crunch.impl.mr.run.CrunchCombiner; import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.impl.mr.run.CrunchOutputFormat; import org.apache.crunch.impl.mr.run.CrunchReducer; import org.apache.crunch.impl.mr.run.NodeContext; import org.apache.crunch.impl.mr.run.RTNode; @@ -214,6 +215,7 @@ class JobPrototype { job.setNumReduceTasks(0); inputNodes = Lists.newArrayList(outputNodes); } + job.setOutputFormatClass(CrunchOutputFormat.class); serialize(inputNodes, conf, workingPath, NodeContext.MAP); if (inputNodes.size() == 1) { http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java new file mode 100644 index 0000000..bd9cdc9 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import org.apache.crunch.io.CrunchOutputs; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> { + @Override + public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new RecordWriter<K, V>() { + @Override + public void write(K k, V v) throws IOException, InterruptedException { + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + } + }; + } + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + CrunchOutputs.checkOutputSpecs(jobContext); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return CrunchOutputs.getOutputCommitter(taskAttemptContext); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index 55fcc89..a536b38 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -17,14 +17,20 @@ */ package org.apache.crunch.io; +import com.google.common.collect.Sets; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Joiner; @@ -35,6 +41,7 @@ import com.google.common.collect.Maps; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; /** * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances @@ -63,7 +70,32 @@ public class CrunchOutputs<K, V> { String existing = conf.get(CRUNCH_OUTPUTS); conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs); } - + + public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException { + Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration()); + for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) { + String namedOutput = e.getKey(); + Job job = getJob(e.getKey(), jc.getConfiguration()); + OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); + fmt.checkOutputSpecs(jc); + } + } + + public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException { + Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration()); + Map<String, OutputCommitter> committers = Maps.newHashMap(); + for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) { + String namedOutput = e.getKey(); + Job job = getJob(e.getKey(), tac.getConfiguration()); + OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); + TaskAttemptContext taskContext = TaskAttemptContextFactory.create( + job.getConfiguration(), tac.getTaskAttemptID()); + OutputCommitter oc = fmt.getOutputCommitter(taskContext); + committers.put(namedOutput, oc); + } + return new CompositeOutputCommitter(outputs, committers); + } + private static class OutputConfig<K, V> { public FormatBundle<OutputFormat<K, V>> bundle; public Class<K> keyClass; @@ -77,11 +109,13 @@ public class CrunchOutputs<K, V> { } } - private static Map<String, OutputConfig> getNamedOutputs( - TaskInputOutputContext<?, ?, ?, ?> context) { + private static Map<String, OutputConfig> getNamedOutputs(Configuration conf) { Map<String, OutputConfig> out = Maps.newHashMap(); - Configuration conf = context.getConfiguration(); - for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) { + String serOut = conf.get(CRUNCH_OUTPUTS); + if (serOut == null || serOut.isEmpty()) { + return out; + } + for (String input : Splitter.on(RECORD_SEP).split(serOut)) { List<String> fields = Lists.newArrayList(SPLITTER.split(input)); String name = fields.get(0); FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf); @@ -99,10 +133,10 @@ public class CrunchOutputs<K, V> { private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename"; private static final String COUNTERS_GROUP = CrunchOutputs.class.getName(); - private final TaskInputOutputContext<?, ?, K, V> baseContext; + private TaskInputOutputContext<?, ?, K, V> baseContext; + private Configuration baseConf; private final Map<String, OutputConfig> namedOutputs; - private final Map<String, RecordWriter<K, V>> recordWriters; - private final Map<String, TaskAttemptContext> taskContextCache; + private final Map<String, OutputState<K, V>> outputStates; private final boolean disableOutputCounters; /** @@ -112,13 +146,17 @@ public class CrunchOutputs<K, V> { * @param context the TaskInputOutputContext object */ public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) { + this(context.getConfiguration()); this.baseContext = context; - namedOutputs = getNamedOutputs(context); - recordWriters = Maps.newHashMap(); - taskContextCache = Maps.newHashMap(); - this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); } - + + public CrunchOutputs(Configuration conf) { + this.baseConf = conf; + this.namedOutputs = getNamedOutputs(conf); + this.outputStates = Maps.newHashMap(); + this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); + } + @SuppressWarnings("unchecked") public void write(String namedOutput, K key, V value) throws IOException, InterruptedException { @@ -126,63 +164,174 @@ public class CrunchOutputs<K, V> { throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'"); } - TaskAttemptContext taskContext = getContext(namedOutput); if (!disableOutputCounters) { baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1); } - getRecordWriter(taskContext, namedOutput).write(key, value); + getOutputState(namedOutput).write(key, value); } public void close() throws IOException, InterruptedException { - for (RecordWriter<?, ?> writer : recordWriters.values()) { - writer.close(baseContext); + for (OutputState<?, ?> out : outputStates.values()) { + out.close(); } } - private TaskAttemptContext getContext(String nameOutput) throws IOException { - TaskAttemptContext taskContext = taskContextCache.get(nameOutput); - if (taskContext != null) { - return taskContext; + private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException { + OutputState<?, ?> out = outputStates.get(namedOutput); + if (out != null) { + return (OutputState<K, V>) out; } // The following trick leverages the instantiation of a record writer via // the job thus supporting arbitrary output formats. - OutputConfig outConfig = namedOutputs.get(nameOutput); - Configuration conf = new Configuration(baseContext.getConfiguration()); - Job job = new Job(conf); - job.getConfiguration().set("crunch.namedoutput", nameOutput); - job.setOutputFormatClass(outConfig.bundle.getFormatClass()); - job.setOutputKeyClass(outConfig.keyClass); - job.setOutputValueClass(outConfig.valueClass); - outConfig.bundle.configure(job.getConfiguration()); - taskContext = TaskAttemptContextFactory.create( - job.getConfiguration(), baseContext.getTaskAttemptID()); - - taskContextCache.put(nameOutput, taskContext); - return taskContext; + Job job = getJob(namedOutput, baseConf); + OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput)); + TaskAttemptContext taskContext = null; + RecordWriter<K, V> recordWriter = null; + if (baseContext != null) { + taskContext = TaskAttemptContextFactory.create( + job.getConfiguration(), baseContext.getTaskAttemptID()); + recordWriter = fmt.getRecordWriter(taskContext); + } + OutputState<K, V> outputState = new OutputState(taskContext, recordWriter); + this.outputStates.put(namedOutput, outputState); + return outputState; } - - private synchronized RecordWriter<K, V> getRecordWriter( - TaskAttemptContext taskContext, String namedOutput) - throws IOException, InterruptedException { - // look for record-writer in the cache - RecordWriter<K, V> writer = recordWriters.get(namedOutput); - - // If not in cache, create a new one - if (writer == null) { - // get the record writer from context output format - taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput); - try { - OutputFormat format = ReflectionUtils.newInstance( - taskContext.getOutputFormatClass(), - taskContext.getConfiguration()); - writer = format.getRecordWriter(taskContext); - } catch (ClassNotFoundException e) { - throw new IOException(e); + + private static Job getJob(String namedOutput, Configuration baseConf) throws IOException { + Job job = new Job(new Configuration(baseConf)); + job.getConfiguration().set("crunch.namedoutput", namedOutput); + return job; + } + + private static void configureJob( + String namedOutput, + Job job, + OutputConfig outConfig) throws IOException { + job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput); + job.setOutputFormatClass(outConfig.bundle.getFormatClass()); + job.setOutputKeyClass(outConfig.keyClass); + job.setOutputValueClass(outConfig.valueClass); + outConfig.bundle.configure(job.getConfiguration()); + } + + private static OutputFormat getOutputFormat( + String namedOutput, + Job job, + OutputConfig outConfig) throws IOException { + configureJob(namedOutput, job, outConfig); + try { + return ReflectionUtils.newInstance( + job.getOutputFormatClass(), + job.getConfiguration()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + private static class OutputState<K, V> { + private final TaskAttemptContext context; + private final RecordWriter<K, V> recordWriter; + + public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) { + this.context = context; + this.recordWriter = recordWriter; + } + + public void write(K key, V value) throws IOException, InterruptedException { + recordWriter.write(key, value); + } + + public void close() throws IOException, InterruptedException { + recordWriter.close(context); + } + } + + private static class CompositeOutputCommitter extends OutputCommitter { + + private final Map<String, OutputConfig> outputs; + private final Map<String, OutputCommitter> committers; + + public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) { + this.outputs = outputs; + this.committers = committers; + } + + private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException { + Job job = getJob(namedOutput, baseContext.getConfiguration()); + configureJob(namedOutput, job, outputs.get(namedOutput)); + return TaskAttemptContextFactory.create(job.getConfiguration(), baseContext.getTaskAttemptID()); + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + Configuration conf = jobContext.getConfiguration(); + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + Job job = getJob(e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); + e.getValue().setupJob(job); + } + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext)); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) { + return true; + } + } + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext)); + } + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext)); + } + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + Configuration conf = jobContext.getConfiguration(); + Set<Path> handledPaths = Sets.newHashSet(); + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + OutputCommitter oc = e.getValue(); + if (oc instanceof FileOutputCommitter) { + Path workPath = ((FileOutputCommitter) oc).getWorkPath(); + if (handledPaths.contains(workPath)) { + continue; + } else { + handledPaths.add(workPath); + } + } + Job job = getJob(e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); + oc.commitJob(job); + } + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + Configuration conf = jobContext.getConfiguration(); + for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { + Job job = getJob(e.getKey(), conf); + configureJob(e.getKey(), job, outputs.get(e.getKey())); + e.getValue().abortJob(job, state); } - recordWriters.put(namedOutput, writer); } - - return writer; } }
