[
https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219182#comment-16219182
]
ASF GitHub Bot commented on HADOOP-14971:
-----------------------------------------
Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/282#discussion_r146936337
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
---
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.DurationInfo;
+import org.apache.hadoop.fs.s3a.commit.Tasks;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import static com.google.common.base.Preconditions.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
+
+/**
+ * Committer based on the contributed work of the
+ * <a href="https://github.com/rdblue/s3committer">Netflix multipart
committers.</a>
+ * <ol>
+ * <li>
+ * The working directory of each task is actually under a temporary
+ * path in the local filesystem; jobs write directly into it.
+ * </li>
+ * <li>
+ * Task Commit: list all files under the task working dir, upload
+ * each of them but do not commit the final operation.
+ * Persist the information for each pending commit into the cluster
+ * for enumeration and commit by the job committer.
+ * </li>
+ * <li>Task Abort: recursive delete of task working dir.</li>
+ * <li>Job Commit: list all pending PUTs to commit; commit them.</li>
+ * <li>
+ * Job Abort: list all pending PUTs to commit; abort them.
+ * Delete all task attempt directories.
+ * </li>
+ * </ol>
+ */
+public class StagingCommitter extends AbstractS3GuardCommitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ StagingCommitter.class);
+ public static final String NAME = "StagingCommitter";
+ private final Path constructorOutputPath;
+ private final long uploadPartSize;
+ private final String uuid;
+ private final boolean uniqueFilenames;
+ private final FileOutputCommitter wrappedCommitter;
+
+ private ConflictResolution conflictResolution;
+ private final Path finalOutputPath;
+ private String s3KeyPrefix = null;
+
+ /** The directory in the cluster FS for commits to go to. */
+ private Path commitsDirectory;
+
+ /**
+ * Committer for a single task attempt.
+ * @param outputPath final output path
+ * @param context task context
+ * @throws IOException on a failure
+ */
+ public StagingCommitter(Path outputPath,
+ TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ this.constructorOutputPath = checkNotNull(getOutputPath(), "output
path");
+ Configuration conf = getConf();
+ this.uploadPartSize = conf.getLongBytes(
+ MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+
+ // Spark will use a fake app ID based on the current minute and job ID
0.
+ // To avoid collisions, use the YARN application ID for Spark.
+ this.uuid = getUploadUUID(conf, context.getJobID());
+ this.uniqueFilenames = conf.getBoolean(
+ FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+ DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
+ setWorkPath(buildWorkPath(context, uuid));
+ this.wrappedCommitter = createWrappedCommitter(context, conf);
+ // forces evaluation and caching of the resolution mode.
+ ConflictResolution mode = getConflictResolutionMode(getJobContext());
+ LOG.debug("Conflict resolution mode: {}", mode);
+ this.finalOutputPath = constructorOutputPath;
--- End diff --
Update: next revision will
1. eliminate `finalOutputPath`as a field
1. call `setOutputPath(finalOutputPath)` in constructor
1. then call `getOutputPath()` and use it in the constructor to set up the
destFS, which is always fetched with `getDestFS()`
1. And all uses of `getOutputPath(JobContext)` are gone (in job setup and
commit); expect the committer to be set up with the correct output path.
This should mean that whatever sets output path will define that output
path,; it should be overrideable, provided that the constructor-invoked methods
don't cause problems before a subclass is inited. Will that work?
> Merge S3A committers into trunk
> -------------------------------
>
> Key: HADOOP-14971
> URL: https://issues.apache.org/jira/browse/HADOOP-14971
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.0.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
>
> Merge the HADOOP-13786 committer into trunk. This branch is being set up as a
> github PR for review there & to keep it out the mailboxes of the watchers on
> the main JIRA
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]