[
https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215400#comment-16215400
]
ASF GitHub Bot commented on HADOOP-14971:
-----------------------------------------
Github user rdblue commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/282#discussion_r146322062
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
---
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.DurationInfo;
+import org.apache.hadoop.fs.s3a.commit.Tasks;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import static com.google.common.base.Preconditions.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
+
+/**
+ * Committer based on the contributed work of the
+ * <a href="https://github.com/rdblue/s3committer">Netflix multipart
committers.</a>
+ * <ol>
+ * <li>
+ * The working directory of each task is actually under a temporary
+ * path in the local filesystem; jobs write directly into it.
+ * </li>
+ * <li>
+ * Task Commit: list all files under the task working dir, upload
+ * each of them but do not commit the final operation.
+ * Persist the information for each pending commit into the cluster
+ * for enumeration and commit by the job committer.
+ * </li>
+ * <li>Task Abort: recursive delete of task working dir.</li>
+ * <li>Job Commit: list all pending PUTs to commit; commit them.</li>
+ * <li>
+ * Job Abort: list all pending PUTs to commit; abort them.
+ * Delete all task attempt directories.
+ * </li>
+ * </ol>
+ */
+public class StagingCommitter extends AbstractS3GuardCommitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ StagingCommitter.class);
+ public static final String NAME = "StagingCommitter";
+ private final Path constructorOutputPath;
+ private final long uploadPartSize;
+ private final String uuid;
+ private final boolean uniqueFilenames;
+ private final FileOutputCommitter wrappedCommitter;
+
+ private ConflictResolution conflictResolution;
+ private final Path finalOutputPath;
+ private String s3KeyPrefix = null;
+
+ /** The directory in the cluster FS for commits to go to. */
+ private Path commitsDirectory;
+
+ /**
+ * Committer for a single task attempt.
+ * @param outputPath final output path
+ * @param context task context
+ * @throws IOException on a failure
+ */
+ public StagingCommitter(Path outputPath,
+ TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ this.constructorOutputPath = checkNotNull(getOutputPath(), "output
path");
+ Configuration conf = getConf();
+ this.uploadPartSize = conf.getLongBytes(
+ MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+
+ // Spark will use a fake app ID based on the current minute and job ID
0.
+ // To avoid collisions, use the YARN application ID for Spark.
+ this.uuid = getUploadUUID(conf, context.getJobID());
+ this.uniqueFilenames = conf.getBoolean(
+ FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+ DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
+ setWorkPath(buildWorkPath(context, uuid));
+ this.wrappedCommitter = createWrappedCommitter(context, conf);
+ // forces evaluation and caching of the resolution mode.
+ ConflictResolution mode = getConflictResolutionMode(getJobContext());
+ LOG.debug("Conflict resolution mode: {}", mode);
+ this.finalOutputPath = constructorOutputPath;
+ Preconditions.checkNotNull(finalOutputPath, "Output path cannot be
null");
+ S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
+ context.getConfiguration(), false);
+ s3KeyPrefix = fs.pathToKey(finalOutputPath);
+ LOG.debug("{}: final output path is {}", getRole(), finalOutputPath);
+ setOutputPath(finalOutputPath);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ /**
+ * Create the wrapped committer.
+ * This includes customizing its options, and setting up the destination
+ * directory.
+ * @param context job/task context.
+ * @param conf config
+ * @return the inner committer
+ * @throws IOException on a failure
+ */
+ protected FileOutputCommitter createWrappedCommitter(JobContext context,
+ Configuration conf) throws IOException {
+
+ // explicitly choose commit algorithm
+ initFileOutputCommitterOptions(context);
+ commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf,
uuid);
+ FileSystem stagingFS = commitsDirectory.getFileSystem(conf);
+ Path qualified = stagingFS.makeQualified(commitsDirectory);
+ if (stagingFS instanceof S3AFileSystem) {
+ // currently refuse to work with S3a for the working FS; you need
+ // a consistent FS. This isn't entirely true with s3guard and
+ // alternative S3 endpoints, but doing it now stops
+ // accidental use of S3
+ throw new PathIOException(qualified.toUri().toString(),
+ "Directory for intermediate work cannot be on S3");
+ }
+ return new FileOutputCommitter(qualified, context);
+ }
+
+ /**
+ * Init the context config with everything needed for the file output
+ * committer. In particular, this code currently only works with
+ * commit algorithm 1.
+ * @param context context to configure.
+ */
+ protected void initFileOutputCommitterOptions(JobContext context) {
+ context.getConfiguration()
+ .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
1);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("StagingCommitter{");
+ sb.append(super.toString());
+ sb.append(", finalOutputPath=").append(finalOutputPath);
+ sb.append(", conflictResolution=").append(conflictResolution);
+ if (wrappedCommitter != null) {
+ sb.append(", wrappedCommitter=").append(wrappedCommitter);
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Get the UUID of an upload; may be the job ID.
+ * @param conf job/task configuration
+ * @param jobId Job ID
+ * @return an ID for use in paths.
+ */
+ public static String getUploadUUID(Configuration conf, String jobId) {
+ return conf.getTrimmed(FS_S3A_COMMITTER_STAGING_UUID,
+ conf.get(SPARK_WRITE_UUID,
+ conf.getTrimmed(SPARK_APP_ID, jobId)));
+ }
+
+ /**
+ * Get the UUID of an upload; may be the job ID.
+ * @param conf job/task configuration
+ * @param jobId Job ID
+ * @return an ID for use in paths.
+ */
+ public static String getUploadUUID(Configuration conf, JobID jobId) {
+ return getUploadUUID(conf, jobId.toString());
+ }
+
+ /**
+ * Get the work path for a task.
+ * @param context job/task complex
+ * @param uuid UUID
+ * @return a path or null if the context is not of a task
+ * @throws IOException failure to build the path
+ */
+ private static Path buildWorkPath(JobContext context, String uuid)
+ throws IOException {
+ if (context instanceof TaskAttemptContext) {
+ return taskAttemptWorkingPath((TaskAttemptContext) context, uuid);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * The staging committers do not require "magic" commit support.
+ * @return false
+ */
+ @Override
+ protected boolean isMagicFileSystemRequired() {
--- End diff --
Is it possible to get rid of this? I never remember what "magic" refers to.
I'd recommend either a better name or removing it from the API.
> Merge S3A committers into trunk
> -------------------------------
>
> Key: HADOOP-14971
> URL: https://issues.apache.org/jira/browse/HADOOP-14971
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.0.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
>
> Merge the HADOOP-13786 committer into trunk. This branch is being set up as a
> github PR for review there & to keep it out the mailboxes of the watchers on
> the main JIRA
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]