HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de8b6ca5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de8b6ca5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de8b6ca5 Branch: refs/heads/YARN-5881 Commit: de8b6ca5ef8614de6d6277b7617e27c788b0555c Parents: 782ba3b Author: Steve Loughran <ste...@apache.org> Authored: Wed Nov 22 15:28:12 2017 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Wed Nov 22 15:28:12 2017 +0000 ---------------------------------------------------------------------- .../dev-support/findbugsExcludeFile.xml | 7 + .../apache/hadoop/fs/FSDataOutputStream.java | 9 + .../apache/hadoop/fs/PathExistsException.java | 4 +- .../org/apache/hadoop/fs/StorageStatistics.java | 5 + .../apache/hadoop/util/JsonSerialization.java | 299 +++ .../src/main/resources/core-default.xml | 117 +- .../hadoop/fs/contract/ContractTestUtils.java | 51 +- .../apache/hadoop/test/GenericTestUtils.java | 29 +- .../org/apache/hadoop/test/HadoopTestBase.java | 51 +- .../org/apache/hadoop/test/LambdaTestUtils.java | 144 +- .../hadoop/util/TestJsonSerialization.java | 185 ++ .../mapreduce/TestMapreduceConfigFields.java | 27 +- .../lib/output/BindingPathOutputCommitter.java | 184 ++ .../lib/output/FileOutputCommitter.java | 12 +- .../lib/output/FileOutputCommitterFactory.java | 38 + .../mapreduce/lib/output/FileOutputFormat.java | 10 +- .../lib/output/NamedCommitterFactory.java | 79 + .../lib/output/PathOutputCommitter.java | 17 + .../lib/output/PathOutputCommitterFactory.java | 204 ++ .../src/main/resources/mapred-default.xml | 22 + .../lib/output/TestPathOutputCommitter.java | 24 +- .../output/TestPathOutputCommitterFactory.java | 495 +++++ hadoop-tools/hadoop-aws/pom.xml | 46 +- .../hadoop/fs/s3a/AWSBadRequestException.java | 42 + .../hadoop/fs/s3a/AWSClientIOException.java | 3 +- .../hadoop/fs/s3a/AWSNoResponseException.java | 31 + .../hadoop/fs/s3a/AWSRedirectException.java | 38 + .../fs/s3a/AWSServiceThrottledException.java | 42 + .../hadoop/fs/s3a/AWSStatus500Exception.java | 37 + .../s3a/BlockingThreadPoolExecutorService.java | 2 +- .../org/apache/hadoop/fs/s3a/Constants.java | 72 +- .../fs/s3a/InconsistentAmazonS3Client.java | 232 ++- .../java/org/apache/hadoop/fs/s3a/Invoker.java | 485 +++++ .../java/org/apache/hadoop/fs/s3a/Listing.java | 26 +- .../java/org/apache/hadoop/fs/s3a/Retries.java | 92 + .../hadoop/fs/s3a/S3ABlockOutputStream.java | 307 +-- .../org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 2 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 940 +++++---- .../apache/hadoop/fs/s3a/S3AInputStream.java | 56 +- .../hadoop/fs/s3a/S3AInstrumentation.java | 231 ++- .../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 246 +++ .../hadoop/fs/s3a/S3AStorageStatistics.java | 12 +- .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 324 ++- .../org/apache/hadoop/fs/s3a/S3ListRequest.java | 11 + .../hadoop/fs/s3a/S3ObjectAttributes.java | 10 +- .../org/apache/hadoop/fs/s3a/Statistic.java | 56 +- .../hadoop/fs/s3a/WriteOperationHelper.java | 474 +++++ .../fs/s3a/commit/AbstractS3ACommitter.java | 756 +++++++ .../s3a/commit/AbstractS3ACommitterFactory.java | 90 + .../hadoop/fs/s3a/commit/CommitConstants.java | 240 +++ .../hadoop/fs/s3a/commit/CommitOperations.java | 596 ++++++ .../hadoop/fs/s3a/commit/CommitUtils.java | 129 ++ .../hadoop/fs/s3a/commit/CommitUtilsWithMR.java | 192 ++ .../apache/hadoop/fs/s3a/commit/Duration.java | 60 + .../hadoop/fs/s3a/commit/DurationInfo.java | 59 + .../s3a/commit/InternalCommitterConstants.java | 100 + .../hadoop/fs/s3a/commit/LocalTempDir.java | 80 + .../fs/s3a/commit/MagicCommitIntegration.java | 182 ++ .../hadoop/fs/s3a/commit/MagicCommitPaths.java | 229 ++ .../fs/s3a/commit/PathCommitException.java | 43 + .../apache/hadoop/fs/s3a/commit/PutTracker.java | 100 + .../fs/s3a/commit/S3ACommitterFactory.java | 129 ++ .../org/apache/hadoop/fs/s3a/commit/Tasks.java | 410 ++++ .../hadoop/fs/s3a/commit/ValidationFailure.java | 53 + .../hadoop/fs/s3a/commit/files/PendingSet.java | 192 ++ .../s3a/commit/files/PersistentCommitData.java | 69 + .../s3a/commit/files/SinglePendingCommit.java | 432 ++++ .../hadoop/fs/s3a/commit/files/SuccessData.java | 322 +++ .../fs/s3a/commit/files/package-info.java | 45 + .../fs/s3a/commit/magic/MagicCommitTracker.java | 161 ++ .../s3a/commit/magic/MagicS3GuardCommitter.java | 288 +++ .../magic/MagicS3GuardCommitterFactory.java | 47 + .../fs/s3a/commit/magic/package-info.java | 27 + .../hadoop/fs/s3a/commit/package-info.java | 28 + .../s3a/commit/staging/ConflictResolution.java | 33 + .../staging/DirectoryStagingCommitter.java | 116 ++ .../DirectoryStagingCommitterFactory.java | 48 + .../staging/PartitionedStagingCommitter.java | 159 ++ .../PartitionedStagingCommitterFactory.java | 48 + .../hadoop/fs/s3a/commit/staging/Paths.java | 300 +++ .../fs/s3a/commit/staging/StagingCommitter.java | 851 ++++++++ .../staging/StagingCommitterConstants.java | 64 + .../commit/staging/StagingCommitterFactory.java | 49 + .../fs/s3a/commit/staging/package-info.java | 27 + .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 368 ++-- .../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 2 + .../hadoop/fs/s3a/s3guard/S3GuardTool.java | 16 +- .../tools/hadoop-aws/committer_architecture.md | 1951 ++++++++++++++++++ .../markdown/tools/hadoop-aws/committers.md | 819 ++++++++ .../src/site/markdown/tools/hadoop-aws/index.md | 188 +- .../site/markdown/tools/hadoop-aws/s3guard.md | 7 +- .../site/markdown/tools/hadoop-aws/testing.md | 60 + .../tools/hadoop-aws/troubleshooting_s3a.md | 124 ++ .../s3a/ITestS3AContractGetFileStatus.java | 11 + .../hadoop/fs/s3a/AbstractS3AMockTest.java | 5 +- .../hadoop/fs/s3a/AbstractS3ATestBase.java | 23 +- .../hadoop/fs/s3a/ITestS3AConfiguration.java | 8 +- .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 6 +- .../hadoop/fs/s3a/ITestS3AFailureHandling.java | 49 +- .../fs/s3a/ITestS3AFileOperationCost.java | 2 + .../fs/s3a/ITestS3ATemporaryCredentials.java | 15 +- .../fs/s3a/ITestS3GuardListConsistency.java | 45 +- .../apache/hadoop/fs/s3a/MockS3AFileSystem.java | 322 +++ .../hadoop/fs/s3a/MockS3ClientFactory.java | 8 + .../apache/hadoop/fs/s3a/S3ATestConstants.java | 6 + .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 132 +- .../hadoop/fs/s3a/StorageStatisticsTracker.java | 99 + .../org/apache/hadoop/fs/s3a/TestInvoker.java | 460 +++++ .../org/apache/hadoop/fs/s3a/TestListing.java | 2 +- .../fs/s3a/TestS3AExceptionTranslation.java | 84 +- .../fs/s3a/commit/AbstractCommitITest.java | 412 ++++ .../fs/s3a/commit/AbstractITCommitMRJob.java | 324 +++ .../fs/s3a/commit/AbstractITCommitProtocol.java | 1371 ++++++++++++ .../fs/s3a/commit/CommitterFaultInjection.java | 44 + .../s3a/commit/CommitterFaultInjectionImpl.java | 131 ++ .../fs/s3a/commit/ITestCommitOperations.java | 545 +++++ .../fs/s3a/commit/LoggingTextOutputFormat.java | 133 ++ .../fs/s3a/commit/MiniDFSClusterService.java | 79 + .../fs/s3a/commit/TestMagicCommitPaths.java | 246 +++ .../apache/hadoop/fs/s3a/commit/TestTasks.java | 550 +++++ .../fs/s3a/commit/magic/ITMagicCommitMRJob.java | 70 + .../commit/magic/ITestMagicCommitProtocol.java | 190 ++ .../commit/magic/ITestS3AHugeMagicCommits.java | 195 ++ .../commit/staging/MockedStagingCommitter.java | 98 + .../staging/PartitionedCommitterForTesting.java | 58 + .../fs/s3a/commit/staging/StagingTestBase.java | 724 +++++++ .../hadoop/fs/s3a/commit/staging/TestPaths.java | 127 ++ .../commit/staging/TestStagingCommitter.java | 696 +++++++ .../TestStagingDirectoryOutputCommitter.java | 138 ++ .../TestStagingPartitionedFileListing.java | 186 ++ .../TestStagingPartitionedJobCommit.java | 236 +++ .../TestStagingPartitionedTaskCommit.java | 237 +++ .../integration/ITDirectoryCommitMRJob.java | 33 + .../integration/ITPartitionCommitMRJob.java | 33 + .../integration/ITStagingCommitMRJob.java | 66 + .../ITestDirectoryCommitProtocol.java | 131 ++ .../ITestPartitionedCommitProtocol.java | 139 ++ .../integration/ITestStagingCommitProtocol.java | 190 ++ .../s3guard/AbstractS3GuardToolTestBase.java | 23 + .../s3a/s3guard/TestDynamoDBMetadataStore.java | 13 +- .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 191 +- .../hadoop-aws/src/test/resources/core-site.xml | 9 +- .../src/test/resources/log4j.properties | 42 +- .../registry/client/binding/JsonSerDeser.java | 224 +- 144 files changed, 24108 insertions(+), 1172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index 4bafd8e..c056d21 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -446,4 +446,11 @@ <Method name="setInstance"/> <Bug pattern="ME_ENUM_FIELD_SETTER"/> </Match> + + <!-- findbugs is complaining that a stream isn't closed. It will be. --> + <Match> + <Class name="org.apache.hadoop.util.JsonSerialization"/> + <Method name="save"/> + <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 1d95cd3..5970373 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -101,6 +101,15 @@ public class FSDataOutputStream extends DataOutputStream out.close(); // This invokes PositionCache.close() } + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "FSDataOutputStream{"); + sb.append("wrappedStream=").append(wrappedStream); + sb.append('}'); + return sb.toString(); + } + /** * Get a reference to the wrapped output stream. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java index ccc1f0c..cd9f70a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java @@ -27,7 +27,7 @@ public class PathExistsException extends PathIOException { super(path, "File exists"); } - protected PathExistsException(String path, String error) { + public PathExistsException(String path, String error) { super(path, error); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java index d987ad0..5a3d736 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java @@ -97,6 +97,11 @@ public abstract class StorageStatistics { public long getValue() { return value; } + + @Override + public String toString() { + return name + " = " + value; + } } private final String name; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java new file mode 100644 index 0000000..15f4fef --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Support for marshalling objects to and from JSON. + * + * It constructs an object mapper as an instance field. + * and synchronizes access to those methods + * which use the mapper. + * + * This class was extracted from + * {@code org.apache.hadoop.registry.client.binding.JsonSerDeser}, + * which is now a subclass of this class. + * @param <T> Type to marshal. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class JsonSerialization<T> { + + private static final Logger LOG = + LoggerFactory.getLogger(JsonSerialization.class); + private static final String UTF_8 = "UTF-8"; + + private final Class<T> classType; + private final ObjectMapper mapper; + + /** + * Create an instance bound to a specific type. + * @param classType class to marshall + * @param failOnUnknownProperties fail if an unknown property is encountered. + * @param pretty generate pretty (indented) output? + */ + public JsonSerialization(Class<T> classType, + boolean failOnUnknownProperties, boolean pretty) { + Preconditions.checkArgument(classType != null, "null classType"); + this.classType = classType; + this.mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + failOnUnknownProperties); + mapper.configure(SerializationFeature.INDENT_OUTPUT, pretty); + } + + /** + * Get the simple name of the class type to be marshalled. + * @return the name of the class being marshalled + */ + public String getName() { + return classType.getSimpleName(); + } + + /** + * Convert from JSON. + * + * @param json input + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonParseException If the input is not well-formatted + * @throws JsonMappingException failure to map from the JSON to this class + */ + @SuppressWarnings("unchecked") + public synchronized T fromJson(String json) + throws IOException, JsonParseException, JsonMappingException { + if (json.isEmpty()) { + throw new EOFException("No data"); + } + try { + return mapper.readValue(json, classType); + } catch (IOException e) { + LOG.error("Exception while parsing json : {}\n{}", e, json, e); + throw e; + } + } + + /** + * Read from an input stream. + * @param stream stream to read from + * @return the parsed entity + * @throws IOException IO problems + * @throws JsonParseException If the input is not well-formatted + * @throws JsonMappingException failure to map from the JSON to this class + */ + public synchronized T fromJsonStream(InputStream stream) throws IOException { + return mapper.readValue(stream, classType); + } + + /** + * Load from a JSON text file. + * @param jsonFile input file + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonParseException If the input is not well-formatted + * @throws JsonMappingException failure to map from the JSON to this class + */ + @SuppressWarnings("unchecked") + public synchronized T load(File jsonFile) + throws IOException, JsonParseException, JsonMappingException { + if (!jsonFile.isFile()) { + throw new FileNotFoundException("Not a file: " + jsonFile); + } + if (jsonFile.length() == 0) { + throw new EOFException("File is empty: " + jsonFile); + } + try { + return mapper.readValue(jsonFile, classType); + } catch (IOException e) { + LOG.error("Exception while parsing json file {}", jsonFile, e); + throw e; + } + } + + /** + * Save to a local file. Any existing file is overwritten unless + * the OS blocks that. + * @param file file + * @param path path + * @throws IOException IO exception + */ + public void save(File file, T instance) throws + IOException { + writeJsonAsBytes(instance, new FileOutputStream(file)); + } + + /** + * Convert from a JSON file. + * @param resource input file + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonParseException If the input is not well-formatted + * @throws JsonMappingException failure to map from the JSON to this class + */ + @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed"}) + public synchronized T fromResource(String resource) + throws IOException, JsonParseException, JsonMappingException { + try (InputStream resStream = this.getClass() + .getResourceAsStream(resource)) { + if (resStream == null) { + throw new FileNotFoundException(resource); + } + return mapper.readValue(resStream, classType); + } catch (IOException e) { + LOG.error("Exception while parsing json resource {}", resource, e); + throw e; + } + } + + /** + * clone by converting to JSON and back again. + * This is much less efficient than any Java clone process. + * @param instance instance to duplicate + * @return a new instance + * @throws IOException IO problems. + */ + public T fromInstance(T instance) throws IOException { + return fromJson(toJson(instance)); + } + + /** + * Load from a Hadoop filesystem. + * There's a check for data availability after the file is open, by + * raising an EOFException if stream.available == 0. + * This allows for a meaningful exception without the round trip overhead + * of a getFileStatus call before opening the file. It may be brittle + * against an FS stream which doesn't return a value here, but the + * standard filesystems all do. + * JSON parsing and mapping problems + * are converted to IOEs. + * @param fs filesystem + * @param path path + * @return a loaded object + * @throws IOException IO or JSON parse problems + */ + public T load(FileSystem fs, Path path) throws IOException { + try (FSDataInputStream dataInputStream = fs.open(path)) { + // throw an EOF exception if there is no data available. + if (dataInputStream.available() == 0) { + throw new EOFException("No data in " + path); + } + return fromJsonStream(dataInputStream); + } catch (JsonProcessingException e) { + throw new IOException( + String.format("Failed to read JSON file \"%s\": %s", path, e), + e); + } + } + + /** + * Save to a Hadoop filesystem. + * @param fs filesystem + * @param path path + * @param overwrite should any existing file be overwritten + * @throws IOException IO exception + */ + public void save(FileSystem fs, Path path, T instance, + boolean overwrite) throws + IOException { + writeJsonAsBytes(instance, fs.create(path, overwrite)); + } + + /** + * Write the JSON as bytes, then close the file. + * @param dataOutputStream an output stream that will always be closed + * @throws IOException on any failure + */ + private void writeJsonAsBytes(T instance, + OutputStream dataOutputStream) throws IOException { + try { + dataOutputStream.write(toBytes(instance)); + } finally { + dataOutputStream.close(); + } + } + + /** + * Convert JSON to bytes. + * @param instance instance to convert + * @return a byte array + * @throws IOException IO problems + */ + public byte[] toBytes(T instance) throws IOException { + return mapper.writeValueAsBytes(instance); + } + + /** + * Deserialize from a byte array. + * @param bytes byte array + * @throws IOException IO problems + * @throws EOFException not enough data + */ + public T fromBytes(byte[] bytes) throws IOException { + return fromJson(new String(bytes, 0, bytes.length, UTF_8)); + } + + /** + * Convert an instance to a JSON string. + * @param instance instance to convert + * @return a JSON string description + * @throws JsonProcessingException Json generation problems + */ + public synchronized String toJson(T instance) throws JsonProcessingException { + return mapper.writeValueAsString(instance); + } + + /** + * Convert an instance to a string form for output. This is a robust + * operation which will convert any JSON-generating exceptions into + * error text. + * @param instance non-null instance + * @return a JSON string + */ + public String toString(T instance) { + Preconditions.checkArgument(instance != null, "Null instance argument"); + try { + return toJson(instance); + } catch (JsonProcessingException e) { + return "Failed to convert to a string: " + e; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 8db9f44..183faa5 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1123,7 +1123,8 @@ <property> <name>fs.s3a.multipart.purge.age</name> <value>86400</value> - <description>Minimum age in seconds of multipart uploads to purge. + <description>Minimum age in seconds of multipart uploads to purge + on startup if "fs.s3a.multipart.purge" is true </description> </property> @@ -1345,6 +1346,120 @@ </property> <property> + <name>fs.s3a.retry.limit</name> + <value>${fs.s3a.attempts.maximum}</value> + <description> + Number of times to retry any repeatable S3 client request on failure, + excluding throttling requests. + </description> +</property> + +<property> + <name>fs.s3a.retry.interval</name> + <value>500ms</value> + <description> + Interval between attempts to retry operations for any reason other + than S3 throttle errors. + </description> +</property> + +<property> + <name>fs.s3a.retry.throttle.limit</name> + <value>${fs.s3a.attempts.maximum}</value> + <description> + Number of times to retry any throttled request. + </description> +</property> + +<property> + <name>fs.s3a.retry.throttle.interval</name> + <value>1000ms</value> + <description> + Interval between retry attempts on throttled requests. + </description> +</property> + +<property> + <name>fs.s3a.committer.name</name> + <value>file</value> + <description> + Committer to create for output to S3A, one of: + "file", "directory", "partitioned", "magic". + </description> +</property> + +<property> + <name>fs.s3a.committer.magic.enabled</name> + <value>false</value> + <description> + Enable support in the filesystem for the S3 "Magic" committer. + When working with AWS S3, S3Guard must be enabled for the destination + bucket, as consistent metadata listings are required. + </description> +</property> + +<property> + <name>fs.s3a.committer.threads</name> + <value>8</value> + <description> + Number of threads in committers for parallel operations on files + (upload, commit, abort, delete...) + </description> +</property> + +<property> + <name>fs.s3a.committer.staging.tmp.path</name> + <value>tmp/staging</value> + <description> + Path in the cluster filesystem for temporary data. + This is for HDFS, not the local filesystem. + It is only for the summary data of each file, not the actual + data being committed. + Using an unqualified path guarantees that the full path will be + generated relative to the home directory of the user creating the job, + hence private (assuming home directory permissions are secure). + </description> +</property> + +<property> + <name>fs.s3a.committer.staging.unique-filenames</name> + <value>true</value> + <description> + Option for final files to have a unique name through job attempt info, + or the value of fs.s3a.committer.staging.uuid + When writing data with the "append" conflict option, this guarantees + that new data will not overwrite any existing data. + </description> +</property> + +<property> + <name>fs.s3a.committer.staging.conflict-mode</name> + <value>fail</value> + <description> + Staging committer conflict resolution policy. + Supported: "fail", "append", "replace". + </description> +</property> + +<property> + <name>fs.s3a.committer.staging.abort.pending.uploads</name> + <value>true</value> + <description> + Should the staging committers abort all pending uploads to the destination + directory? + + Changing this if more than one partitioned committer is + writing to the same destination tree simultaneously; otherwise + the first job to complete will cancel all outstanding uploads from the + others. However, it may lead to leaked outstanding uploads from failed + tasks. If disabled, configure the bucket lifecycle to remove uploads + after a time period, and/or set up a workflow to explicitly delete + entries. Otherwise there is a risk that uncommitted uploads may run up + bills. + </description> +</property> + +<property> <name>fs.AbstractFileSystem.s3a.impl</name> <value>org.apache.hadoop.fs.s3a.S3A</value> <description>The implementation class of the S3A AbstractFileSystem.</description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index e0cc7d6..d2cbca0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -725,6 +725,8 @@ public class ContractTestUtils extends Assert { /** * Read in "length" bytes, convert to an ascii string. + * This uses {@link #toChar(byte)} to escape bytes, so cannot be used + * for round trip operations. * @param fs filesystem * @param path path to read * @param length #of bytes to read. @@ -742,6 +744,28 @@ public class ContractTestUtils extends Assert { } /** + * Read in "length" bytes, convert to UTF8 string. + * @param fs filesystem + * @param path path to read + * @param length #of bytes to read. If -1: use file length. + * @return the bytes read and converted to a string + * @throws IOException IO problems + */ + public static String readUTF8(FileSystem fs, + Path path, + int length) throws IOException { + if (length < 0) { + FileStatus status = fs.getFileStatus(path); + length = (int) status.getLen(); + } + try (FSDataInputStream in = fs.open(path)) { + byte[] buf = new byte[length]; + in.readFully(0, buf); + return new String(buf, "UTF-8"); + } + } + + /** * Take an array of filestats and convert to a string * (prefixed with/ a [%02d] counter). * @param stats array of stats @@ -857,11 +881,30 @@ public class ContractTestUtils extends Assert { */ public static void assertPathExists(FileSystem fileSystem, String message, Path path) throws IOException { - if (!fileSystem.exists(path)) { + verifyPathExists(fileSystem, message, path); + } + + /** + * Verify that a path exists, returning the file status of the path. + * + * @param fileSystem filesystem to examine + * @param message message to include in the assertion failure message + * @param path path in the filesystem + * @throws FileNotFoundException raised if the path is missing + * @throws IOException IO problems + */ + public static FileStatus verifyPathExists(FileSystem fileSystem, + String message, + Path path) throws IOException { + try { + return fileSystem.getFileStatus(path); + } catch (FileNotFoundException e) { //failure, report it - ls(fileSystem, path.getParent()); - throw new FileNotFoundException(message + ": not found " + path - + " in " + path.getParent()); + LOG.error("{}: not found {}; parent listing is:\n{}", + message, path, ls(fileSystem, path.getParent())); + throw (IOException)new FileNotFoundException( + message + ": not found " + path + " in " + path.getParent()) + .initCause(e); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index 4cb9f8b..0db6c73 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -316,20 +316,37 @@ public abstract class GenericTestUtils { /** * Assert that an exception's <code>toString()</code> value * contained the expected text. - * @param string expected string + * @param expectedText expected string * @param t thrown exception * @throws AssertionError if the expected string is not found */ - public static void assertExceptionContains(String string, Throwable t) { + public static void assertExceptionContains(String expectedText, Throwable t) { + assertExceptionContains(expectedText, t, ""); + } + + /** + * Assert that an exception's <code>toString()</code> value + * contained the expected text. + * @param expectedText expected string + * @param t thrown exception + * @param message any extra text for the string + * @throws AssertionError if the expected string is not found + */ + public static void assertExceptionContains(String expectedText, + Throwable t, + String message) { Assert.assertNotNull(E_NULL_THROWABLE, t); String msg = t.toString(); if (msg == null) { throw new AssertionError(E_NULL_THROWABLE_STRING, t); } - if (!msg.contains(string)) { - throw new AssertionError("Expected to find '" + string + "' " - + E_UNEXPECTED_EXCEPTION + ":" - + StringUtils.stringifyException(t), + if (expectedText != null && !msg.contains(expectedText)) { + String prefix = org.apache.commons.lang.StringUtils.isEmpty(message) + ? "" : (message + ": "); + throw new AssertionError( + String.format("%s Expected to find '%s' %s: %s", + prefix, expectedText, E_UNEXPECTED_EXCEPTION, + StringUtils.stringifyException(t)), t); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java index 43d5be8..cb7df4b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java @@ -17,29 +17,37 @@ */ package org.apache.hadoop.test; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; +import org.junit.rules.TestName; import org.junit.rules.Timeout; /** * A base class for JUnit4 tests that sets a default timeout for all tests - * that subclass this test + * that subclass this test. + * + * Threads are named to the method being executed, for ease of diagnostics + * in logs and thread dumps. */ -public abstract class HadoopTestBase { +public abstract class HadoopTestBase extends Assert { + /** - * System property name to set the test timeout: {@value} + * System property name to set the test timeout: {@value}. */ public static final String PROPERTY_TEST_DEFAULT_TIMEOUT = - "test.default.timeout"; + "test.default.timeout"; /** * The default timeout (in milliseconds) if the system property * {@link #PROPERTY_TEST_DEFAULT_TIMEOUT} - * is not set: {@value} + * is not set: {@value}. */ public static final int TEST_DEFAULT_TIMEOUT_VALUE = 100000; /** - * The JUnit rule that sets the default timeout for tests + * The JUnit rule that sets the default timeout for tests. */ @Rule public Timeout defaultTimeout = retrieveTestTimeout(); @@ -64,4 +72,35 @@ public abstract class HadoopTestBase { } return new Timeout(millis); } + + /** + * The method name. + */ + @Rule + public TestName methodName = new TestName(); + + /** + * Get the method name; defaults to the value of {@link #methodName}. + * Subclasses may wish to override it, which will tune the thread naming. + * @return the name of the method. + */ + protected String getMethodName() { + return methodName.getMethodName(); + } + + /** + * Static initializer names this thread "JUnit". + */ + @BeforeClass + public static void nameTestThread() { + Thread.currentThread().setName("JUnit"); + } + + /** + * Before each method, the thread is renamed to match the method name. + */ + @Before + public void nameThreadToMethod() { + Thread.currentThread().setName("JUnit-" + getMethodName()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java index 3ea9ab8..22208f7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java @@ -19,11 +19,13 @@ package org.apache.hadoop.test; import com.google.common.base.Preconditions; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.util.Time; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; @@ -43,7 +45,7 @@ import java.util.concurrent.TimeoutException; * check; useful when checking the contents of the exception. */ public final class LambdaTestUtils { - public static final Logger LOG = + private static final Logger LOG = LoggerFactory.getLogger(LambdaTestUtils.class); private LambdaTestUtils() { @@ -60,6 +62,7 @@ public final class LambdaTestUtils { * Interface to implement for converting a timeout into some form * of exception to raise. */ + @FunctionalInterface public interface TimeoutHandler { /** @@ -371,16 +374,11 @@ public final class LambdaTestUtils { Class<E> clazz, Callable<T> eval) throws Exception { - try { - T result = eval.call(); - throw new AssertionError("Expected an exception, got " - + robustToString(result)); - } catch (Throwable e) { - if (clazz.isAssignableFrom(e.getClass())) { - return (E)e; - } - throw e; - } + return intercept(clazz, + null, + "Expected a " + clazz.getName() + " to be thrown," + + " but got the result: ", + eval); } /** @@ -451,6 +449,59 @@ public final class LambdaTestUtils { } /** + * Intercept an exception; throw an {@code AssertionError} if one not raised. + * The caught exception is rethrown if it is of the wrong class or + * does not contain the text defined in {@code contained}. + * <p> + * Example: expect deleting a nonexistent file to raise a + * {@code FileNotFoundException} with the {@code toString()} value + * containing the text {@code "missing"}. + * <pre> + * FileNotFoundException ioe = intercept(FileNotFoundException.class, + * "missing", + * "path should not be found", + * () -> { + * filesystem.delete(new Path("/missing"), false); + * }); + * </pre> + * + * @param clazz class of exception; the raised exception must be this class + * <i>or a subclass</i>. + * @param contained string which must be in the {@code toString()} value + * of the exception + * @param message any message tho include in exception/log messages + * @param eval expression to eval + * @param <T> return type of expression + * @param <E> exception class + * @return the caught exception if it was of the expected type and contents + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + * The error includes the {@code toString()} value of the result, if this + * can be determined. + * @see GenericTestUtils#assertExceptionContains(String, Throwable) + */ + public static <T, E extends Throwable> E intercept( + Class<E> clazz, + String contained, + String message, + Callable<T> eval) + throws Exception { + E ex; + try { + T result = eval.call(); + throw new AssertionError(message + ": " + robustToString(result)); + } catch (Throwable e) { + if (!clazz.isAssignableFrom(e.getClass())) { + throw e; + } else { + ex = (E) e; + } + } + GenericTestUtils.assertExceptionContains(contained, ex, message); + return ex; + } + + /** * Variant of {@link #intercept(Class, Callable)} to simplify void * invocations. * @param clazz class of exception; the raised exception must be this class @@ -468,9 +519,41 @@ public final class LambdaTestUtils { String contained, VoidCallable eval) throws Exception { - E ex = intercept(clazz, eval); - GenericTestUtils.assertExceptionContains(contained, ex); - return ex; + return intercept(clazz, contained, + "Expecting " + clazz.getName() + + (contained != null? (" with text " + contained) : "") + + " but got ", + () -> { + eval.call(); + return "void"; + }); + } + + /** + * Variant of {@link #intercept(Class, Callable)} to simplify void + * invocations. + * @param clazz class of exception; the raised exception must be this class + * <i>or a subclass</i>. + * @param contained string which must be in the {@code toString()} value + * of the exception + * @param message any message tho include in exception/log messages + * @param eval expression to eval + * @param <E> exception class + * @return the caught exception if it was of the expected type + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + */ + public static <E extends Throwable> E intercept( + Class<E> clazz, + String contained, + String message, + VoidCallable eval) + throws Exception { + return intercept(clazz, contained, message, + () -> { + eval.call(); + return "void"; + }); } /** @@ -495,6 +578,38 @@ public final class LambdaTestUtils { } /** + * Assert that an optional value matches an expected one; + * checks include null and empty on the actual value. + * @param message message text + * @param expected expected value + * @param actual actual optional value + * @param <T> type + */ + public static <T> void assertOptionalEquals(String message, + T expected, + Optional<T> actual) { + Assert.assertNotNull(message, actual); + Assert.assertTrue(message +" -not present", actual.isPresent()); + Assert.assertEquals(message, expected, actual.get()); + } + + /** + * Assert that an optional value matches an expected one; + * checks include null and empty on the actual value. + * @param message message text + * @param expected expected value + * @param actual actual optional value + * @param <T> type + */ + public static <T> void assertOptionalUnset(String message, + Optional<T> actual) { + Assert.assertNotNull(message, actual); + if (actual.isPresent()) { + Assert.fail("Expected empty option, got " + actual.get().toString()); + } + } + + /** * Returns {@code TimeoutException} on a timeout. If * there was a inner class passed in, includes it as the * inner failure. @@ -638,6 +753,7 @@ public final class LambdaTestUtils { * A simple interface for lambdas, which returns nothing; this exists * to simplify lambda tests on operations with no return value. */ + @FunctionalInterface public interface VoidCallable { void call() throws Exception; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java new file mode 100644 index 0000000..991697d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.Serializable; +import java.util.Objects; + +import com.fasterxml.jackson.core.JsonParseException; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.HadoopTestBase; +import org.apache.hadoop.test.LambdaTestUtils; + +/** + * Test the JSON serialization helper. + */ +public class TestJsonSerialization extends HadoopTestBase { + + private final JsonSerialization<KeyVal> serDeser = + new JsonSerialization<>(KeyVal.class, true, true); + + private final KeyVal source = new KeyVal("key", "1"); + + private static class KeyVal implements Serializable { + private String name; + private String value; + + KeyVal(String name, String value) { + this.name = name; + this.value = value; + } + + KeyVal() { + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("SimpleJson{"); + sb.append("name='").append(name).append('\''); + sb.append(", value='").append(value).append('\''); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KeyVal that = (KeyVal) o; + return Objects.equals(name, that.name) && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(name, value); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + + @Test + public void testStringRoundTrip() throws Throwable { + String wire = serDeser.toJson(source); + KeyVal unmarshalled = serDeser.fromJson(wire); + assertEquals("Failed to unmarshall: " + wire, source, unmarshalled); + } + + @Test + public void testBytesRoundTrip() throws Throwable { + byte[] wire = serDeser.toBytes(source); + KeyVal unmarshalled = serDeser.fromBytes(wire); + assertEquals(source, unmarshalled); + } + + @Test + public void testBadBytesRoundTrip() throws Throwable { + LambdaTestUtils.intercept(JsonParseException.class, + "token", + () -> serDeser.fromBytes(new byte[]{'a'})); + } + + @Test + public void testCloneViaJson() throws Throwable { + KeyVal unmarshalled = serDeser.fromInstance(source); + assertEquals(source, unmarshalled); + } + + @Test + public void testFileRoundTrip() throws Throwable { + File tempFile = File.createTempFile("Keyval", ".json"); + tempFile.delete(); + try { + serDeser.save(tempFile, source); + assertEquals(source, serDeser.load(tempFile)); + } finally { + tempFile.delete(); + } + } + + @Test + public void testEmptyFile() throws Throwable { + File tempFile = File.createTempFile("Keyval", ".json"); + try { + LambdaTestUtils.intercept(EOFException.class, + "empty", + () -> serDeser.load(tempFile)); + } finally { + tempFile.delete(); + } + } + + @Test + public void testFileSystemRoundTrip() throws Throwable { + File tempFile = File.createTempFile("Keyval", ".json"); + tempFile.delete(); + Path tempPath = new Path(tempFile.toURI()); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + try { + serDeser.save(fs, tempPath, source, false); + assertEquals(source, serDeser.load(fs, tempPath)); + } finally { + fs.delete(tempPath, false); + } + } + + @Test + public void testFileSystemEmptyPath() throws Throwable { + File tempFile = File.createTempFile("Keyval", ".json"); + Path tempPath = new Path(tempFile.toURI()); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + try { + LambdaTestUtils.intercept(EOFException.class, + () -> serDeser.load(fs, tempPath)); + fs.delete(tempPath, false); + LambdaTestUtils.intercept(FileNotFoundException.class, + () -> serDeser.load(fs, tempPath)); + } finally { + fs.delete(tempPath, false); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java index 5d42fbf..f8aaab7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; /** @@ -53,14 +54,23 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase { @SuppressWarnings("deprecation") @Override public void initializeMemberVariables() { - xmlFilename = new String("mapred-default.xml"); - configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class, - JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class, - FileInputFormat.class, Job.class, NLineInputFormat.class, - JobConf.class, FileOutputCommitter.class }; + xmlFilename = "mapred-default.xml"; + configurationClasses = new Class[] { + MRJobConfig.class, + MRConfig.class, + JHAdminConfig.class, + ShuffleHandler.class, + FileOutputFormat.class, + FileInputFormat.class, + Job.class, + NLineInputFormat.class, + JobConf.class, + FileOutputCommitter.class, + PathOutputCommitterFactory.class + }; // Initialize used variables - configurationPropsToSkipCompare = new HashSet<String>(); + configurationPropsToSkipCompare = new HashSet<>(); // Set error modes errorIfMissingConfigProps = true; @@ -82,6 +92,11 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase { MRJobConfig.MAP_RESOURCE_TYPE_PREFIX); configurationPropsToSkipCompare.add( MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX); + + // PathOutputCommitterFactory values + xmlPrefixToSkipCompare = new HashSet<>(); + xmlPrefixToSkipCompare.add( + PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java new file mode 100644 index 0000000..f12678b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a special committer which creates the factory for the committer and + * runs off that. Why does it exist? So that you can explicitly instantiate + * a committer by classname and yet still have the actual implementation + * driven dynamically by the factory options and destination filesystem. + * This simplifies integration + * with existing code which takes the classname of a committer. + * There's no factory for this, as that would lead to a loop. + * + * All commit protocol methods and accessors are delegated to the + * wrapped committer. + * + * How to use: + * + * <ol> + * <li> + * In applications which take a classname of committer in + * a configuration option, set it to the canonical name of this class + * (see {@link #NAME}). When this class is instantiated, it will + * use the factory mechanism to locate the configured committer for the + * destination. + * </li> + * <li> + * In code, explicitly create an instance of this committer through + * its constructor, then invoke commit lifecycle operations on it. + * The dynamically configured committer will be created in the constructor + * and have the lifecycle operations relayed to it. + * </li> + * </ol> + * + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class BindingPathOutputCommitter extends PathOutputCommitter { + + /** + * The classname for use in configurations. + */ + public static final String NAME + = BindingPathOutputCommitter.class.getCanonicalName(); + + /** + * The bound committer. + */ + private final PathOutputCommitter committer; + + /** + * Instantiate. + * @param outputPath output path (may be null) + * @param context task context + * @throws IOException on any failure. + */ + public BindingPathOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + committer = PathOutputCommitterFactory.getCommitterFactory(outputPath, + context.getConfiguration()) + .createOutputCommitter(outputPath, context); + } + + @Override + public Path getOutputPath() { + return committer.getOutputPath(); + } + + @Override + public Path getWorkPath() throws IOException { + return committer.getWorkPath(); + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + committer.setupJob(jobContext); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + committer.setupTask(taskContext); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return committer.needsTaskCommit(taskContext); + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + committer.commitTask(taskContext); + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + committer.abortTask(taskContext); + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext jobContext) throws IOException { + super.cleanupJob(jobContext); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + committer.commitJob(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) + throws IOException { + committer.abortJob(jobContext, state); + } + + @SuppressWarnings("deprecation") + @Override + public boolean isRecoverySupported() { + return committer.isRecoverySupported(); + } + + @Override + public boolean isCommitJobRepeatable(JobContext jobContext) + throws IOException { + return committer.isCommitJobRepeatable(jobContext); + } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return committer.isRecoverySupported(jobContext); + } + + @Override + public void recoverTask(TaskAttemptContext taskContext) throws IOException { + committer.recoverTask(taskContext); + } + + @Override + public boolean hasOutputPath() { + return committer.hasOutputPath(); + } + + @Override + public String toString() { + return "BindingPathOutputCommitter{" + + "committer=" + committer + + '}'; + } + + /** + * Get the inner committer. + * @return the bonded committer. + */ + public PathOutputCommitter getCommitter() { + return committer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 325b2e7..86af2cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -155,17 +155,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @return the path where final output of the job should be placed. This * could also be considered the committed application attempt path. */ - private Path getOutputPath() { + @Override + public Path getOutputPath() { return this.outputPath; } - - /** - * @return true if we have an output path set, else false. - */ - private boolean hasOutputPath() { - return this.outputPath != null; - } - + /** * @return the path where the output of pending job attempts are * stored. http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java new file mode 100644 index 0000000..12b2841 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Creates a {@link FileOutputCommitter}, always. + */ +public final class FileOutputCommitterFactory + extends PathOutputCommitterFactory { + + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return createFileOutputCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java index 0e7efa3..bbda26a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java @@ -328,12 +328,14 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> { job.getConfiguration().set(BASE_OUTPUT_NAME, name); } - public synchronized - OutputCommitter getOutputCommitter(TaskAttemptContext context - ) throws IOException { + public synchronized + OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { if (committer == null) { Path output = getOutputPath(context); - committer = new FileOutputCommitter(output, context); + committer = PathOutputCommitterFactory.getCommitterFactory( + output, + context.getConfiguration()).createOutputCommitter(output, context); } return committer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java new file mode 100644 index 0000000..b7378af --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A factory which creates any named committer identified + * in the option {@link PathOutputCommitterFactory#NAMED_COMMITTER_CLASS}. + */ +public final class NamedCommitterFactory extends + PathOutputCommitterFactory { + private static final Logger LOG = + LoggerFactory.getLogger(NamedCommitterFactory.class); + + @SuppressWarnings("JavaReflectionMemberAccess") + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + Class<? extends PathOutputCommitter> clazz = loadCommitterClass(context); + LOG.debug("Using PathOutputCommitter implementation {}", clazz); + try { + Constructor<? extends PathOutputCommitter> ctor + = clazz.getConstructor(Path.class, TaskAttemptContext.class); + return ctor.newInstance(outputPath, context); + } catch (NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + throw new IOException("Failed to create " + clazz + + ":" + e, e); + } + } + + /** + * Load the class named in {@link #NAMED_COMMITTER_CLASS}. + * @param context job or task context + * @return the committer class + * @throws IOException if no committer was defined. + */ + private Class<? extends PathOutputCommitter> loadCommitterClass( + JobContext context) throws IOException { + Preconditions.checkNotNull(context, "null context"); + Configuration conf = context.getConfiguration(); + String value = conf.get(NAMED_COMMITTER_CLASS, ""); + if (value.isEmpty()) { + throw new IOException("No committer defined in " + NAMED_COMMITTER_CLASS); + } + return conf.getClass(NAMED_COMMITTER_CLASS, + FileOutputCommitter.class, PathOutputCommitter.class); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java index 2df30ba..3679d9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java @@ -76,9 +76,26 @@ public abstract class PathOutputCommitter extends OutputCommitter { } /** + * Get the final directory where work will be placed once the job + * is committed. This may be null, in which case, there is no output + * path to write data to. + * @return the path where final output of the job should be placed. + */ + public abstract Path getOutputPath(); + + /** + * Predicate: is there an output path? + * @return true if we have an output path set, else false. + */ + public boolean hasOutputPath() { + return getOutputPath() != null; + } + + /** * Get the directory that the task should write results into. * Warning: there's no guarantee that this work path is on the same * FS as the final output, or that it's visible across machines. + * May be null. * @return the work directory * @throws IOException IO problem */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java new file mode 100644 index 0000000..0df14d1 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * A factory for committers implementing the {@link PathOutputCommitter} + * methods, and so can be used from {@link FileOutputFormat}. + * The base implementation returns {@link FileOutputCommitter} instances. + * + * Algorithm: + * <ol> + * <ul>If an explicit committer factory is named, it is used.</ul> + * <ul>The output path is examined. + * If is non null and there is an explicit schema for that filesystem, + * its factory is instantiated.</ul> + * <ul>Otherwise, an instance of {@link FileOutputCommitter} is + * created.</ul> + * </ol> + * + * In {@link FileOutputFormat}, the created factory has its method + * {@link #createOutputCommitter(Path, TaskAttemptContext)} with a task + * attempt context and a possibly null path. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PathOutputCommitterFactory extends Configured { + private static final Logger LOG = + LoggerFactory.getLogger(PathOutputCommitterFactory.class); + + /** + * Name of the configuration option used to configure the + * output committer factory to use unless there is a specific + * one for a schema. + */ + public static final String COMMITTER_FACTORY_CLASS = + "mapreduce.outputcommitter.factory.class"; + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + public static final String COMMITTER_FACTORY_SCHEME = + "mapreduce.outputcommitter.factory.scheme"; + + /** + * String format pattern for per-filesystem scheme committers. + */ + public static final String COMMITTER_FACTORY_SCHEME_PATTERN = + COMMITTER_FACTORY_SCHEME + ".%s"; + + + /** + * The {@link FileOutputCommitter} factory. + */ + public static final String FILE_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory"; + + /** + * The {@link FileOutputCommitter} factory. + */ + public static final String NAMED_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory"; + + /** + * The named output committer. + * Creates any committer listed in + */ + public static final String NAMED_COMMITTER_CLASS = + "mapreduce.outputcommitter.named.classname"; + + /** + * Default committer factory name: {@value}. + */ + public static final String COMMITTER_FACTORY_DEFAULT = + FILE_COMMITTER_FACTORY; + + /** + * Create an output committer for a task attempt. + * @param outputPath output path. This may be null. + * @param context context + * @return a new committer + * @throws IOException problems instantiating the committer + */ + public PathOutputCommitter createOutputCommitter( + Path outputPath, + TaskAttemptContext context) throws IOException { + return createFileOutputCommitter(outputPath, context); + } + + /** + * Create an instance of the default committer, a {@link FileOutputCommitter} + * for a task. + * @param outputPath the task's output path, or or null if no output path + * has been defined. + * @param context the task attempt context + * @return the committer to use + * @throws IOException problems instantiating the committer + */ + protected final PathOutputCommitter createFileOutputCommitter( + Path outputPath, + TaskAttemptContext context) throws IOException { + LOG.debug("Creating FileOutputCommitter for path {} and context {}", + outputPath, context); + return new FileOutputCommitter(outputPath, context); + } + + /** + * Get the committer factory for a configuration. + * @param outputPath the job's output path. If null, it means that the + * schema is unknown and a per-schema factory cannot be determined. + * @param conf configuration + * @return an instantiated committer factory + */ + public static PathOutputCommitterFactory getCommitterFactory( + Path outputPath, + Configuration conf) { + // determine which key to look up the overall one or a schema-specific + // key + LOG.debug("Looking for committer factory for path {}", outputPath); + String key = COMMITTER_FACTORY_CLASS; + if (StringUtils.isEmpty(conf.getTrimmed(key)) && outputPath != null) { + // there is no explicit factory and there's an output path + // Get the scheme of the destination + String scheme = outputPath.toUri().getScheme(); + + // and see if it has a key + String schemeKey = String.format(COMMITTER_FACTORY_SCHEME_PATTERN, + scheme); + if (StringUtils.isNotEmpty(conf.getTrimmed(schemeKey))) { + // it does, so use that key in the classname lookup + LOG.debug("Using schema-specific factory for {}", outputPath); + key = schemeKey; + } else { + LOG.debug("No scheme-specific factory defined in {}", schemeKey); + } + } + + // create the factory. Before using Configuration.getClass, check + // for an empty configuration value, as that raises ClassNotFoundException. + Class<? extends PathOutputCommitterFactory> factory; + String trimmedValue = conf.getTrimmed(key, ""); + if (StringUtils.isEmpty(trimmedValue)) { + // empty/null value, use default + LOG.debug("No output committer factory defined," + + " defaulting to FileOutputCommitterFactory"); + factory = FileOutputCommitterFactory.class; + } else { + // key is set, get the class + factory = conf.getClass(key, + FileOutputCommitterFactory.class, + PathOutputCommitterFactory.class); + LOG.debug("Using OutputCommitter factory class {} from key {}", + factory, key); + } + return ReflectionUtils.newInstance(factory, conf); + } + + /** + * Create the committer factory for a task attempt & destination, then + * create the committer from it. + * @param outputPath the task's output path, or or null if no output path + * has been defined. + * @param context the task attempt context + * @return the committer to use + * @throws IOException problems instantiating the committer + */ + public static PathOutputCommitter createCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return getCommitterFactory(outputPath, + context.getConfiguration()) + .createOutputCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9d166c7..1e432ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -2043,4 +2043,26 @@ <name>mapreduce.job.send-token-conf</name> <value></value> </property> + +<property> + <description> + The name of an output committer factory for MRv2 FileOutputFormat to use + for committing work. If set, overrides any per-filesystem committer + defined for the destination filesystem. + </description> + <name>mapreduce.outputcommitter.factory.class</name> + <value></value> +</property> + + +<property> + <name>mapreduce.outputcommitter.factory.scheme.s3a</name> + <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value> + <description> + The committer factory to use when writing data to S3A filesystems. + If mapreduce.outputcommitter.factory.class is set, it will + override this property. + </description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java index 9cff82f..3b73934 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java @@ -109,14 +109,34 @@ public class TestPathOutputCommitter extends Assert { public void abortTask(TaskAttemptContext taskContext) throws IOException { } + + @Override + public Path getOutputPath() { + return null; + } } /** * Stub task context. + * The {@link #getConfiguration()} method returns the configuration supplied + * in the constructor; while {@link #setOutputCommitter(OutputCommitter)} + * sets the committer returned in {@link #getOutputCommitter()}. + * Otherwise, the methods are all no-ops. */ - public class TaskContext + public static class TaskContext implements TaskInputOutputContext<String, String, String, String> { + private final Configuration configuration; + + public TaskContext() { + this(new Configuration()); + } + + public TaskContext(Configuration conf) { + this.configuration = conf; + } + + private OutputCommitter outputCommitter; public void setOutputCommitter(OutputCommitter outputCommitter) { @@ -180,7 +200,7 @@ public class TestPathOutputCommitter extends Assert { @Override public Configuration getConfiguration() { - return null; + return configuration; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org