[ 
https://issues.apache.org/jira/browse/HADOOP-13786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003372#comment-16003372
 ] 

Aaron Fabbri commented on HADOOP-13786:
---------------------------------------

Been spending some time reviewing the patch.  It is pretty large, so I haven't 
covered 100%, but here are some comments so far:

{code}
+ * This method was copied from
+ * {@code org.apache.hadoop.registry.client.binding.JsonSerDeser}.
+ * @param <T> Type to marshal.
+ */
{code}

Can we move the dependency to a common place instead of copying?

{code}
/**
+ * A factory for committers implementing the {@link PathOutputCommitter}
+ * methods, and so can be used from {@link FileOutputFormat}.
+ * The base implementation returns {@link FileOutputFormat} instances.
+ */
+public class PathOutputCommitterFactory extends Configured {
{code}
In the comment, did you mean "FileOutputCommitter instances"?

{code}
           bytes = putObject();
         }
       } else {
-        // there has already been at least one block scheduled for upload;
-        // put up the current then wait
-        if (hasBlock && block.hasData()) {
+        // there's an MPU in progress';
+        // IF there is more data to upload, or no data has yet been uploaded,
+        // PUT the final block
+        if (hasBlock &&
+            (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
{code}
This to allow for zero byte files, right?  Looks like you had to make similar 
changes elsewhere.  Did you cover this new behavior with a test case?

{code}
+  public void abortOutstandingMultipartUploads(long seconds)
+      throws IOException {
+    Preconditions.checkArgument(seconds >=0);
+    Date purgeBefore =
+        new Date(new Date().getTime() - seconds * 1000);
+    LOG.debug("Purging outstanding multipart uploads older than {}",
+        purgeBefore);
+    try {
+      transfers.abortMultipartUploads(bucket, purgeBefore);
+    } catch (AmazonServiceException e) {
+      throw translateException("purging multipart uploads", bucket, e);
+    }
{code}
Thinking of how multipart purge interacts with magic committer.

Currently someone can config fs.s3a.multipart.purge.age to 0.  I'm wondering if
we shouldn't increase the minimum or log a warning when magic committer is also
enabled?  Maybe it at least deserves some special treatment in the docs?

{code}
    * Returns the S3 client used by this filesystem.
    * @return AmazonS3Client
    */
-  AmazonS3 getAmazonS3Client() {
+  public AmazonS3 getAmazonS3Client() {
     return s3;
   }
{code}
Not @VisibleForTesting?   Looks like you just want to subclass/override. Can we 
make it protected instead?

{code}
    * @param key input key
    * @return the fully qualified path including URI scheme and bucket name.
    */
-  Path keyToQualifiedPath(String key) {
+  public Path keyToQualifiedPath(String key) {
{code}

Should we annotate the interface stability or audience here?

{code}
      * @return the upload result containing the ID
      * @throws IOException IO problem
      */
-    String initiateMultiPartUpload() throws IOException {
+    public String initiateMultiPartUpload() throws IOException {
       LOG.debug("Initiating Multipart upload");
       final InitiateMultipartUploadRequest initiateMPURequest =
           new InitiateMultipartUploadRequest(bucket,
{code}
Audience annotation?

{code}
+    /**
+     * Finalize a multipart PUT operation.
+     * This completes the upload, and, if that works, calls
+     * {@link #finishedWrite(String, long)} to update the filesystem.
+     * @param uploadId multipart operation Id
+     * @param partETags list of partial uploads
+     * @return the result of the operation.
+     * @throws IOException on problems.
+     */
{code}
Semi-related: I've been thinking of splitting up S3AFileSystem in the
future.  Seems like we could separate WriteOperationHelper, and maybe put
other S3-level operations (i.e. below anything that cares about MetadataStore)
into a separate class.

{code}
+    public int abortMultipartUploadsUnderPath(String prefix)
+        throws IOException {
+      int count = 0;
+      for (MultipartUpload upload : listMultipartUploads(prefix)) {
+        try {
+          abortMultipartCommit(upload);
+          count++;
+        } catch (FileNotFoundException e) {
+          LOG.debug("Already aborted: {}", upload.getKey(), e);
{code}
I wonder if we'll need to deal with retryable exceptions here, instead of
propagating.

{code}
+     * @return the upload initiated
+     * @throws IOException on problems
+     */
+    public PutObjectResult putObjectAndFinalize(
+        PutObjectRequest putObjectRequest,
+        int length) throws IOException {
+      PutObjectResult result = putObject(putObjectRequest);
+      finishedWrite(putObjectRequest.getKey(), length);
{code}

Also wondering about error handling / retries here.  I don't know if there
is a difference between direct put and TransferManager in terms of built-in
retry logic.

{code}
+    /**
+     * Revert a commit by deleting the file.
+     * TODO: Policy regarding creating a mock empty parent directory.
+     * @param destKey destination key
+     * @throws IOException due to inability to delete a directory or file.
+     */
+    public void revertCommit(String destKey) throws IOException {
{code}

Does the directory need to exist after revert?  Can we just say it doesn't?

{code}
+  /**
+   * Delete a path quietly: failures are logged at DEBUG.
+   * @param fs filesystem
+   * @param path path
+   * @param recursive recursive?
+   */
+  public static void deleteWithWarning(FileSystem fs,
{code}
/quietly: failures are logged at DEBUG/: failures are logged at WARN/

{code}
+public abstract class Abstract3GuardCommitterFactory
{code}
"AbstractS3GuardCommitterFactory" (Missing the 'S')

{code}
+  public static List<String> splitPathToElements(Path path) {
+    String uriPath = path.toUri().getPath();
+    checkArgument(!uriPath.isEmpty(), "empty path");
+    checkArgument(uriPath.charAt(0) == '/', "path is relative");
+    if ("/".equals(uriPath)) {
+      // special case: empty list
+      return new ArrayList<>(0);
+    }
+    path.depth();
{code}
Meant to delete this last line?

{code}
+  /**
+   * Verify that that path is a delayed commit path.
{code}
nit: /that/the/ or /that//

{code}
+
+/**
+ * Adds the code needed for S3A integration.
+ * It's pulled out to keep S3A FS class slightly less complex.
{code}
Thanks for the separation here.

{code}
+ * Configuration options:
+ * <pre>
+ *   : temporary local FS directory
+ *   : intermediate directory on a cluster-wide FS (can be HDFS or a consistent
+ *   s3 endpoint).
{code}
Couldn't parse this comment.  Is there supposed to be a config name before the 
':'?

{code}
+  public void testConcurrentCommitTaskWithSubDir() throws Exception {
+    Job job = newJob();
+    FileOutputFormat.setOutputPath(job, outDir);
+    final Configuration conf = job.getConfiguration();
+/*
+
+    conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+    FileSystem.closeAll();
+*/
{code}
There are a couple of spots with commented-out code and TODOs still.  Note to 
clean these up.

{code}
+    // if this fails with "directory is already locked" set umask to 0022
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    //cluster = new MiniDFSCluster.Builder(new Configuration()).build();
{code}
ditto.

{code}
+/*
+  @Override
+  protected void deleteObject(String key) throws InvalidRequestException {
+    // force into sharing the existing mock entry point
+    getAmazonS3Client().deleteObject(new DeleteObjectRequest(getBucket(), 
key));
+  }
+*/
{code}
same.

{code}
+
+    @Override
+    public void setFaults(Faults... faults) {
+      injection.setFaults(faults);
{code}
 I only saw this used once in AbstractITCommitProtocol which sets a
commitJob failure.  Curious to hear your plans on more fault injection.  Hope 
we can discuss in a call soon.

Finally, a reminder that your log4j.properties diff will need to be commented 
out or removed eventually.  The settings here are useful for testing and dev 
though.


> Add S3Guard committer for zero-rename commits to consistent S3 endpoints
> ------------------------------------------------------------------------
>
>                 Key: HADOOP-13786
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13786
>             Project: Hadoop Common
>          Issue Type: New Feature
>          Components: fs/s3
>    Affects Versions: HADOOP-13345
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>         Attachments: HADOOP-13786-HADOOP-13345-001.patch, 
> HADOOP-13786-HADOOP-13345-002.patch, HADOOP-13786-HADOOP-13345-003.patch, 
> HADOOP-13786-HADOOP-13345-004.patch, HADOOP-13786-HADOOP-13345-005.patch, 
> HADOOP-13786-HADOOP-13345-006.patch, HADOOP-13786-HADOOP-13345-006.patch, 
> HADOOP-13786-HADOOP-13345-007.patch, HADOOP-13786-HADOOP-13345-009.patch, 
> HADOOP-13786-HADOOP-13345-010.patch, HADOOP-13786-HADOOP-13345-011.patch, 
> HADOOP-13786-HADOOP-13345-012.patch, HADOOP-13786-HADOOP-13345-013.patch, 
> HADOOP-13786-HADOOP-13345-015.patch, HADOOP-13786-HADOOP-13345-016.patch, 
> HADOOP-13786-HADOOP-13345-017.patch, HADOOP-13786-HADOOP-13345-018.patch, 
> HADOOP-13786-HADOOP-13345-019.patch, HADOOP-13786-HADOOP-13345-020.patch, 
> HADOOP-13786-HADOOP-13345-021.patch, HADOOP-13786-HADOOP-13345-022.patch, 
> HADOOP-13786-HADOOP-13345-023.patch, HADOOP-13786-HADOOP-13345-024.patch, 
> HADOOP-13786-HADOOP-13345-025.patch, objectstore.pdf, s3committer-master.zip
>
>
> A goal of this code is "support O(1) commits to S3 repositories in the 
> presence of failures". Implement it, including whatever is needed to 
> demonstrate the correctness of the algorithm. (that is, assuming that s3guard 
> provides a consistent view of the presence/absence of blobs, show that we can 
> commit directly).
> I consider ourselves free to expose the blobstore-ness of the s3 output 
> streams (ie. not visible until the close()), if we need to use that to allow 
> us to abort commit operations.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to