Repository: hadoop Updated Branches: refs/heads/branch-3.0 7193c9558 -> c960ae0b9
MAPREDUCE-7060. Cherry Pick PathOutputCommitter class/factory to branch-3.0. Contributed by Steve Loughran. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c960ae0b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c960ae0b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c960ae0b Branch: refs/heads/branch-3.0 Commit: c960ae0b9a9f47bb440800eb0c94eea877992bfb Parents: 7193c95 Author: Steve Loughran <[email protected]> Authored: Thu Mar 1 18:39:54 2018 +0000 Committer: Steve Loughran <[email protected]> Committed: Thu Mar 1 18:39:54 2018 +0000 ---------------------------------------------------------------------- .../mapreduce/TestMapreduceConfigFields.java | 27 +- .../lib/output/BindingPathOutputCommitter.java | 184 +++++++ .../lib/output/FileOutputCommitter.java | 12 +- .../lib/output/FileOutputCommitterFactory.java | 38 ++ .../mapreduce/lib/output/FileOutputFormat.java | 10 +- .../lib/output/NamedCommitterFactory.java | 79 +++ .../lib/output/PathOutputCommitter.java | 17 + .../lib/output/PathOutputCommitterFactory.java | 204 ++++++++ .../src/main/resources/mapred-default.xml | 11 + .../lib/output/TestPathOutputCommitter.java | 24 +- .../output/TestPathOutputCommitterFactory.java | 495 +++++++++++++++++++ 11 files changed, 1080 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java index 5d42fbf..f8aaab7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; /** @@ -53,14 +54,23 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase { @SuppressWarnings("deprecation") @Override public void initializeMemberVariables() { - xmlFilename = new String("mapred-default.xml"); - configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class, - JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class, - FileInputFormat.class, Job.class, NLineInputFormat.class, - JobConf.class, FileOutputCommitter.class }; + xmlFilename = "mapred-default.xml"; + configurationClasses = new Class[] { + MRJobConfig.class, + MRConfig.class, + JHAdminConfig.class, + ShuffleHandler.class, + FileOutputFormat.class, + FileInputFormat.class, + Job.class, + NLineInputFormat.class, + JobConf.class, + FileOutputCommitter.class, + PathOutputCommitterFactory.class + }; // Initialize used variables - configurationPropsToSkipCompare = new HashSet<String>(); + configurationPropsToSkipCompare = new HashSet<>(); // Set error modes errorIfMissingConfigProps = true; @@ -82,6 +92,11 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase { MRJobConfig.MAP_RESOURCE_TYPE_PREFIX); configurationPropsToSkipCompare.add( MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX); + + // PathOutputCommitterFactory values + xmlPrefixToSkipCompare = new HashSet<>(); + xmlPrefixToSkipCompare.add( + PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java new file mode 100644 index 0000000..f12678b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a special committer which creates the factory for the committer and + * runs off that. Why does it exist? So that you can explicitly instantiate + * a committer by classname and yet still have the actual implementation + * driven dynamically by the factory options and destination filesystem. + * This simplifies integration + * with existing code which takes the classname of a committer. + * There's no factory for this, as that would lead to a loop. + * + * All commit protocol methods and accessors are delegated to the + * wrapped committer. + * + * How to use: + * + * <ol> + * <li> + * In applications which take a classname of committer in + * a configuration option, set it to the canonical name of this class + * (see {@link #NAME}). When this class is instantiated, it will + * use the factory mechanism to locate the configured committer for the + * destination. + * </li> + * <li> + * In code, explicitly create an instance of this committer through + * its constructor, then invoke commit lifecycle operations on it. + * The dynamically configured committer will be created in the constructor + * and have the lifecycle operations relayed to it. + * </li> + * </ol> + * + */ [email protected] [email protected] +public class BindingPathOutputCommitter extends PathOutputCommitter { + + /** + * The classname for use in configurations. + */ + public static final String NAME + = BindingPathOutputCommitter.class.getCanonicalName(); + + /** + * The bound committer. + */ + private final PathOutputCommitter committer; + + /** + * Instantiate. + * @param outputPath output path (may be null) + * @param context task context + * @throws IOException on any failure. + */ + public BindingPathOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + committer = PathOutputCommitterFactory.getCommitterFactory(outputPath, + context.getConfiguration()) + .createOutputCommitter(outputPath, context); + } + + @Override + public Path getOutputPath() { + return committer.getOutputPath(); + } + + @Override + public Path getWorkPath() throws IOException { + return committer.getWorkPath(); + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + committer.setupJob(jobContext); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + committer.setupTask(taskContext); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return committer.needsTaskCommit(taskContext); + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + committer.commitTask(taskContext); + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + committer.abortTask(taskContext); + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext jobContext) throws IOException { + super.cleanupJob(jobContext); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + committer.commitJob(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) + throws IOException { + committer.abortJob(jobContext, state); + } + + @SuppressWarnings("deprecation") + @Override + public boolean isRecoverySupported() { + return committer.isRecoverySupported(); + } + + @Override + public boolean isCommitJobRepeatable(JobContext jobContext) + throws IOException { + return committer.isCommitJobRepeatable(jobContext); + } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return committer.isRecoverySupported(jobContext); + } + + @Override + public void recoverTask(TaskAttemptContext taskContext) throws IOException { + committer.recoverTask(taskContext); + } + + @Override + public boolean hasOutputPath() { + return committer.hasOutputPath(); + } + + @Override + public String toString() { + return "BindingPathOutputCommitter{" + + "committer=" + committer + + '}'; + } + + /** + * Get the inner committer. + * @return the bonded committer. + */ + public PathOutputCommitter getCommitter() { + return committer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 325b2e7..86af2cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -155,17 +155,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @return the path where final output of the job should be placed. This * could also be considered the committed application attempt path. */ - private Path getOutputPath() { + @Override + public Path getOutputPath() { return this.outputPath; } - - /** - * @return true if we have an output path set, else false. - */ - private boolean hasOutputPath() { - return this.outputPath != null; - } - + /** * @return the path where the output of pending job attempts are * stored. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java new file mode 100644 index 0000000..12b2841 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Creates a {@link FileOutputCommitter}, always. + */ +public final class FileOutputCommitterFactory + extends PathOutputCommitterFactory { + + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return createFileOutputCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java index 0e7efa3..bbda26a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java @@ -328,12 +328,14 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> { job.getConfiguration().set(BASE_OUTPUT_NAME, name); } - public synchronized - OutputCommitter getOutputCommitter(TaskAttemptContext context - ) throws IOException { + public synchronized + OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { if (committer == null) { Path output = getOutputPath(context); - committer = new FileOutputCommitter(output, context); + committer = PathOutputCommitterFactory.getCommitterFactory( + output, + context.getConfiguration()).createOutputCommitter(output, context); } return committer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java new file mode 100644 index 0000000..b7378af --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A factory which creates any named committer identified + * in the option {@link PathOutputCommitterFactory#NAMED_COMMITTER_CLASS}. + */ +public final class NamedCommitterFactory extends + PathOutputCommitterFactory { + private static final Logger LOG = + LoggerFactory.getLogger(NamedCommitterFactory.class); + + @SuppressWarnings("JavaReflectionMemberAccess") + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + Class<? extends PathOutputCommitter> clazz = loadCommitterClass(context); + LOG.debug("Using PathOutputCommitter implementation {}", clazz); + try { + Constructor<? extends PathOutputCommitter> ctor + = clazz.getConstructor(Path.class, TaskAttemptContext.class); + return ctor.newInstance(outputPath, context); + } catch (NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + throw new IOException("Failed to create " + clazz + + ":" + e, e); + } + } + + /** + * Load the class named in {@link #NAMED_COMMITTER_CLASS}. + * @param context job or task context + * @return the committer class + * @throws IOException if no committer was defined. + */ + private Class<? extends PathOutputCommitter> loadCommitterClass( + JobContext context) throws IOException { + Preconditions.checkNotNull(context, "null context"); + Configuration conf = context.getConfiguration(); + String value = conf.get(NAMED_COMMITTER_CLASS, ""); + if (value.isEmpty()) { + throw new IOException("No committer defined in " + NAMED_COMMITTER_CLASS); + } + return conf.getClass(NAMED_COMMITTER_CLASS, + FileOutputCommitter.class, PathOutputCommitter.class); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java index 2df30ba..3679d9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java @@ -76,9 +76,26 @@ public abstract class PathOutputCommitter extends OutputCommitter { } /** + * Get the final directory where work will be placed once the job + * is committed. This may be null, in which case, there is no output + * path to write data to. + * @return the path where final output of the job should be placed. + */ + public abstract Path getOutputPath(); + + /** + * Predicate: is there an output path? + * @return true if we have an output path set, else false. + */ + public boolean hasOutputPath() { + return getOutputPath() != null; + } + + /** * Get the directory that the task should write results into. * Warning: there's no guarantee that this work path is on the same * FS as the final output, or that it's visible across machines. + * May be null. * @return the work directory * @throws IOException IO problem */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java new file mode 100644 index 0000000..7d214f2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * A factory for committers implementing the {@link PathOutputCommitter} + * methods, and so can be used from {@link FileOutputFormat}. + * The base implementation returns {@link FileOutputCommitter} instances. + * + * Algorithm: + * <ol> + * <li>If an explicit committer factory is named, it is used.</li> + * <li>The output path is examined. + * If is non null and there is an explicit schema for that filesystem, + * its factory is instantiated.</li> + * <li>Otherwise, an instance of {@link FileOutputCommitter} is + * created.</li> + * </ol> + * + * In {@link FileOutputFormat}, the created factory has its method + * {@link #createOutputCommitter(Path, TaskAttemptContext)} with a task + * attempt context and a possibly null path. + * + */ [email protected] [email protected] +public class PathOutputCommitterFactory extends Configured { + private static final Logger LOG = + LoggerFactory.getLogger(PathOutputCommitterFactory.class); + + /** + * Name of the configuration option used to configure the + * output committer factory to use unless there is a specific + * one for a schema. + */ + public static final String COMMITTER_FACTORY_CLASS = + "mapreduce.outputcommitter.factory.class"; + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + public static final String COMMITTER_FACTORY_SCHEME = + "mapreduce.outputcommitter.factory.scheme"; + + /** + * String format pattern for per-filesystem scheme committers. + */ + public static final String COMMITTER_FACTORY_SCHEME_PATTERN = + COMMITTER_FACTORY_SCHEME + ".%s"; + + + /** + * The {@link FileOutputCommitter} factory. + */ + public static final String FILE_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory"; + + /** + * The {@link FileOutputCommitter} factory. + */ + public static final String NAMED_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory"; + + /** + * The named output committer. + * Creates any committer listed in + */ + public static final String NAMED_COMMITTER_CLASS = + "mapreduce.outputcommitter.named.classname"; + + /** + * Default committer factory name: {@value}. + */ + public static final String COMMITTER_FACTORY_DEFAULT = + FILE_COMMITTER_FACTORY; + + /** + * Create an output committer for a task attempt. + * @param outputPath output path. This may be null. + * @param context context + * @return a new committer + * @throws IOException problems instantiating the committer + */ + public PathOutputCommitter createOutputCommitter( + Path outputPath, + TaskAttemptContext context) throws IOException { + return createFileOutputCommitter(outputPath, context); + } + + /** + * Create an instance of the default committer, a {@link FileOutputCommitter} + * for a task. + * @param outputPath the task's output path, or or null if no output path + * has been defined. + * @param context the task attempt context + * @return the committer to use + * @throws IOException problems instantiating the committer + */ + protected final PathOutputCommitter createFileOutputCommitter( + Path outputPath, + TaskAttemptContext context) throws IOException { + LOG.debug("Creating FileOutputCommitter for path {} and context {}", + outputPath, context); + return new FileOutputCommitter(outputPath, context); + } + + /** + * Get the committer factory for a configuration. + * @param outputPath the job's output path. If null, it means that the + * schema is unknown and a per-schema factory cannot be determined. + * @param conf configuration + * @return an instantiated committer factory + */ + public static PathOutputCommitterFactory getCommitterFactory( + Path outputPath, + Configuration conf) { + // determine which key to look up the overall one or a schema-specific + // key + LOG.debug("Looking for committer factory for path {}", outputPath); + String key = COMMITTER_FACTORY_CLASS; + if (StringUtils.isEmpty(conf.getTrimmed(key)) && outputPath != null) { + // there is no explicit factory and there's an output path + // Get the scheme of the destination + String scheme = outputPath.toUri().getScheme(); + + // and see if it has a key + String schemeKey = String.format(COMMITTER_FACTORY_SCHEME_PATTERN, + scheme); + if (StringUtils.isNotEmpty(conf.getTrimmed(schemeKey))) { + // it does, so use that key in the classname lookup + LOG.debug("Using schema-specific factory for {}", outputPath); + key = schemeKey; + } else { + LOG.debug("No scheme-specific factory defined in {}", schemeKey); + } + } + + // create the factory. Before using Configuration.getClass, check + // for an empty configuration value, as that raises ClassNotFoundException. + Class<? extends PathOutputCommitterFactory> factory; + String trimmedValue = conf.getTrimmed(key, ""); + if (StringUtils.isEmpty(trimmedValue)) { + // empty/null value, use default + LOG.debug("No output committer factory defined," + + " defaulting to FileOutputCommitterFactory"); + factory = FileOutputCommitterFactory.class; + } else { + // key is set, get the class + factory = conf.getClass(key, + FileOutputCommitterFactory.class, + PathOutputCommitterFactory.class); + LOG.debug("Using OutputCommitter factory class {} from key {}", + factory, key); + } + return ReflectionUtils.newInstance(factory, conf); + } + + /** + * Create the committer factory for a task attempt and destination, then + * create the committer from it. + * @param outputPath the task's output path, or or null if no output path + * has been defined. + * @param context the task attempt context + * @return the committer to use + * @throws IOException problems instantiating the committer + */ + public static PathOutputCommitter createCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return getCommitterFactory(outputPath, + context.getConfiguration()) + .createOutputCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9d166c7..3bf2543 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -2043,4 +2043,15 @@ <name>mapreduce.job.send-token-conf</name> <value></value> </property> + +<property> + <description> + The name of an output committer factory for MRv2 FileOutputFormat to use + for committing work. If set, overrides any per-filesystem committer + defined for the destination filesystem. + </description> + <name>mapreduce.outputcommitter.factory.class</name> + <value></value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java index 9cff82f..3b73934 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java @@ -109,14 +109,34 @@ public class TestPathOutputCommitter extends Assert { public void abortTask(TaskAttemptContext taskContext) throws IOException { } + + @Override + public Path getOutputPath() { + return null; + } } /** * Stub task context. + * The {@link #getConfiguration()} method returns the configuration supplied + * in the constructor; while {@link #setOutputCommitter(OutputCommitter)} + * sets the committer returned in {@link #getOutputCommitter()}. + * Otherwise, the methods are all no-ops. */ - public class TaskContext + public static class TaskContext implements TaskInputOutputContext<String, String, String, String> { + private final Configuration configuration; + + public TaskContext() { + this(new Configuration()); + } + + public TaskContext(Configuration conf) { + this.configuration = conf; + } + + private OutputCommitter outputCommitter; public void setOutputCommitter(OutputCommitter outputCommitter) { @@ -180,7 +200,7 @@ public class TestPathOutputCommitter extends Assert { @Override public Configuration getConfiguration() { - return null; + return configuration; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/c960ae0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java new file mode 100644 index 0000000..13e1c61 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test the committer factory logic, looking at the override + * and fallback behavior. + */ +@SuppressWarnings("unchecked") +public class TestPathOutputCommitterFactory extends Assert { + + private static final String HTTP_COMMITTER_FACTORY = String.format( + COMMITTER_FACTORY_SCHEME_PATTERN, "http"); + + private static final Path HTTP_PATH = new Path("http://hadoop.apache.org/"); + private static final Path HDFS_PATH = new Path("hdfs://localhost:8081/"); + + private TaskAttemptID taskAttemptID = + new TaskAttemptID("local", 0, TaskType.MAP, 1, 2); + + /** + * Set a factory for a schema, verify it works. + * @throws Throwable failure + */ + @Test + public void testCommitterFactoryForSchema() throws Throwable { + createCommitterFactory(SimpleCommitterFactory.class, + HTTP_PATH, + newBondedConfiguration()); + } + + /** + * A schema factory only affects that filesystem. + * @throws Throwable failure + */ + @Test + public void testCommitterFactoryFallbackDefault() throws Throwable { + createCommitterFactory(FileOutputCommitterFactory.class, + HDFS_PATH, + newBondedConfiguration()); + } + + /** + * A schema factory only affects that filesystem; test through + * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}. + * @throws Throwable failure + */ + @Test + public void testCommitterFallbackDefault() throws Throwable { + createCommitter(FileOutputCommitter.class, + HDFS_PATH, + taskAttempt(newBondedConfiguration())); + } + + /** + * Verify that you can override any schema with an explicit name. + */ + @Test + public void testCommitterFactoryOverride() throws Throwable { + Configuration conf = newBondedConfiguration(); + // set up for the schema factory + // and then set a global one which overrides the others. + conf.set(COMMITTER_FACTORY_CLASS, OtherFactory.class.getName()); + createCommitterFactory(OtherFactory.class, HDFS_PATH, conf); + createCommitterFactory(OtherFactory.class, HTTP_PATH, conf); + } + + /** + * Verify that if the factory class option is "", schema factory + * resolution still works. + */ + @Test + public void testCommitterFactoryEmptyOption() throws Throwable { + Configuration conf = newBondedConfiguration(); + // set up for the schema factory + // and then set a global one which overrides the others. + conf.set(COMMITTER_FACTORY_CLASS, ""); + createCommitterFactory(SimpleCommitterFactory.class, HTTP_PATH, conf); + + // and HDFS, with no schema, falls back to the default + createCommitterFactory(FileOutputCommitterFactory.class, HDFS_PATH, conf); + } + + /** + * Verify that if the committer factory class is unknown, you cannot + * create committers. + */ + @Test + public void testCommitterFactoryUnknown() throws Throwable { + Configuration conf = new Configuration(); + // set the factory to an unknown class + conf.set(COMMITTER_FACTORY_CLASS, "unknown"); + intercept(RuntimeException.class, + () -> getCommitterFactory(HDFS_PATH, conf)); + } + + /** + * Verify that if the committer output path is null, you get back + * a FileOutputCommitter with null output & work paths. + */ + @Test + public void testCommitterNullOutputPath() throws Throwable { + // bind http to schema + Configuration conf = newBondedConfiguration(); + // then ask committers for a null path + FileOutputCommitter committer = createCommitter( + FileOutputCommitterFactory.class, + FileOutputCommitter.class, + null, conf); + assertNull(committer.getOutputPath()); + assertNull(committer.getWorkPath()); + } + + /** + * Verify that if you explicitly name a committer, that takes priority + * over any filesystem committer. + */ + @Test + public void testNamedCommitterFactory() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + SimpleCommitter sc = createCommitter( + NamedCommitterFactory.class, + SimpleCommitter.class, HDFS_PATH, conf); + assertEquals("Wrong output path from " + sc, + HDFS_PATH, + sc.getOutputPath()); + } + + /** + * Verify that if you explicitly name a committer and there's no + * path, the committer is picked up. + */ + @Test + public void testNamedCommitterFactoryNullPath() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + SimpleCommitter sc = createCommitter( + NamedCommitterFactory.class, + SimpleCommitter.class, + null, conf); + assertNull(sc.getOutputPath()); + } + + /** + * Verify that if you explicitly name a committer and there's no + * path, the committer is picked up. + */ + @Test + public void testNamedCommitterNullPath() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + + SimpleCommitter sc = createCommitter( + SimpleCommitter.class, + null, taskAttempt(conf)); + assertNull(sc.getOutputPath()); + } + + /** + * Create a factory then a committer, validating the type of both. + * @param <T> type of factory + * @param <U> type of committer + * @param factoryClass expected factory class + * @param committerClass expected committer class + * @param path output path (may be null) + * @param conf configuration + * @return the committer + * @throws IOException failure to create + */ + private <T extends PathOutputCommitterFactory, U extends PathOutputCommitter> + U createCommitter(Class<T> factoryClass, + Class<U> committerClass, + Path path, + Configuration conf) throws IOException { + T f = createCommitterFactory(factoryClass, path, conf); + PathOutputCommitter committer = f.createOutputCommitter(path, + taskAttempt(conf)); + assertEquals(" Wrong committer for path " + path + " from factory " + f, + committerClass, committer.getClass()); + return (U) committer; + } + + /** + * Create a committer from a task context, via + * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}. + * @param <U> type of committer + * @param committerClass expected committer class + * @param path output path (may be null) + * @param context task attempt context + * @return the committer + * @throws IOException failure to create + */ + private <U extends PathOutputCommitter> U createCommitter( + Class<U> committerClass, + Path path, + TaskAttemptContext context) throws IOException { + PathOutputCommitter committer = PathOutputCommitterFactory + .createCommitter(path, context); + assertEquals(" Wrong committer for path " + path, + committerClass, committer.getClass()); + return (U) committer; + } + + /** + * Create a factory then a committer, validating its type. + * @param factoryClass expected factory class + * @param path output path (may be null) + * @param conf configuration + * @param <T> type of factory + * @return the factory + */ + private <T extends PathOutputCommitterFactory> T createCommitterFactory( + Class<T> factoryClass, + Path path, + Configuration conf) { + PathOutputCommitterFactory factory = getCommitterFactory(path, conf); + assertEquals(" Wrong factory for path " + path, + factoryClass, factory.getClass()); + return (T)factory; + } + + /** + * Create a new task attempt context. + * @param conf config + * @return a new context + */ + private TaskAttemptContext taskAttempt(Configuration conf) { + return new TaskAttemptContextImpl(conf, taskAttemptID); + } + + /** + * Verify that if you explicitly name a committer, that takes priority + * over any filesystem committer. + */ + @Test + public void testFileOutputCommitterFactory() throws Throwable { + Configuration conf = new Configuration(); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, FILE_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + getCommitterFactory(HDFS_PATH, conf); + createCommitter( + FileOutputCommitterFactory.class, + FileOutputCommitter.class, null, conf); + } + + /** + * Follow the entire committer chain down and create a new committer from + * the output format. + * @throws Throwable on a failure. + */ + @Test + public void testFileOutputFormatBinding() throws Throwable { + Configuration conf = newBondedConfiguration(); + conf.set(FileOutputFormat.OUTDIR, HTTP_PATH.toUri().toString()); + TextOutputFormat<String, String> off = new TextOutputFormat<>(); + SimpleCommitter committer = (SimpleCommitter) + off.getOutputCommitter(taskAttempt(conf)); + assertEquals("Wrong output path from "+ committer, + HTTP_PATH, + committer.getOutputPath()); + } + + /** + * Follow the entire committer chain down and create a new committer from + * the output format. + * @throws Throwable on a failure. + */ + @Test + public void testFileOutputFormatBindingNoPath() throws Throwable { + Configuration conf = new Configuration(); + conf.unset(FileOutputFormat.OUTDIR); + // set up for the schema factory + conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY); + conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName()); + httpToSimpleFactory(conf); + TextOutputFormat<String, String> off = new TextOutputFormat<>(); + SimpleCommitter committer = (SimpleCommitter) + off.getOutputCommitter(taskAttempt(conf)); + assertNull("Output path from "+ committer, + committer.getOutputPath()); + } + + /** + * Bind the http schema CommitterFactory to {@link SimpleCommitterFactory}. + * @param conf config to patch + */ + private Configuration httpToSimpleFactory(Configuration conf) { + conf.set(HTTP_COMMITTER_FACTORY, SimpleCommitterFactory.class.getName()); + return conf; + } + + + /** + * Create a configuration with the http schema bonded to the simple factory. + * @return a new, patched configuration + */ + private Configuration newBondedConfiguration() { + return httpToSimpleFactory(new Configuration()); + } + + /** + * Extract the (mandatory) cause of an exception. + * @param ex exception + * @param clazz expected class + * @return the cause, which will be of the expected type + * @throws AssertionError if there is a problem + */ + private <E extends Throwable> E verifyCauseClass(Throwable ex, + Class<E> clazz) throws AssertionError { + Throwable cause = ex.getCause(); + if (cause == null) { + throw new AssertionError("No cause", ex); + } + if (!cause.getClass().equals(clazz)) { + throw new AssertionError("Wrong cause class", cause); + } + return (E)cause; + } + + @Test + public void testBadCommitterFactory() throws Throwable { + expectFactoryConstructionFailure(HTTP_COMMITTER_FACTORY); + } + + @Test + public void testBoundCommitterWithSchema() throws Throwable { + // this verifies that a bound committer relays to the underlying committer + Configuration conf = newBondedConfiguration(); + TestPathOutputCommitter.TaskContext tac + = new TestPathOutputCommitter.TaskContext(conf); + BindingPathOutputCommitter committer + = new BindingPathOutputCommitter(HTTP_PATH, tac); + intercept(IOException.class, "setupJob", + () -> committer.setupJob(tac)); + } + + @Test + public void testBoundCommitterWithDefault() throws Throwable { + // this verifies that a bound committer relays to the underlying committer + Configuration conf = newBondedConfiguration(); + TestPathOutputCommitter.TaskContext tac + = new TestPathOutputCommitter.TaskContext(conf); + BindingPathOutputCommitter committer + = new BindingPathOutputCommitter(HDFS_PATH, tac); + assertEquals(FileOutputCommitter.class, + committer.getCommitter().getClass()); + } + + /** + * Set the specific key to a string which is not a factory class; expect + * a failure. + * @param key key to set + * @throws Throwable on a failure + */ + @SuppressWarnings("ThrowableNotThrown") + protected void expectFactoryConstructionFailure(String key) throws Throwable { + Configuration conf = new Configuration(); + conf.set(key, "Not a factory"); + RuntimeException ex = intercept(RuntimeException.class, + () -> getCommitterFactory(HTTP_PATH, conf)); + verifyCauseClass( + verifyCauseClass(ex, RuntimeException.class), + ClassNotFoundException.class); + } + + /** + * A simple committer. + */ + public static final class SimpleCommitter extends PathOutputCommitter { + + private final Path outputPath; + + public SimpleCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.outputPath = outputPath; + } + + @Override + public Path getWorkPath() throws IOException { + return null; + } + + /** + * Job setup throws an exception. + * @param jobContext Context of the job + * @throws IOException always + */ + @Override + public void setupJob(JobContext jobContext) throws IOException { + throw new IOException("setupJob"); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + + } + + @Override + public Path getOutputPath() { + return outputPath; + } + } + + /** + * The simple committer factory. + */ + private static class SimpleCommitterFactory + extends PathOutputCommitterFactory { + + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return new SimpleCommitter(outputPath, context); + } + + } + + /** + * Some other factory. + */ + private static class OtherFactory extends PathOutputCommitterFactory { + + /** + * {@inheritDoc} + * @param outputPath output path. This may be null. + * @param context context + * @return + * @throws IOException + */ + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return new SimpleCommitter(outputPath, context); + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
