[ 
https://issues.apache.org/jira/browse/HADOOP-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813793#comment-17813793
 ] 

ASF GitHub Bot commented on HADOOP-19047:
-----------------------------------------

steveloughran commented on code in PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#discussion_r1476480590


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -52,6 +52,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;

Review Comment:
   move to same group as rest of apache imports



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.commons.lang3.StringUtils;

Review Comment:
   import ordering and grouping. best to set the ide for the code style -but 
don't rearrange imports on existing classes as it ruins cherrypicking



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java:
##########
@@ -248,6 +236,80 @@ private PendingSet innerCommitTask(
     return pendingSet;
   }
 
+  /**
+   * Loads pending commits from either memory or from the remote store (S3) 
based on the config.
+   * @param context TaskAttemptContext
+   * @return All pending commit data for the given TaskAttemptContext
+   * @throws IOException
+   *           if there is an error trying to read the commit data
+   */
+  protected PendingSet loadPendingCommits(TaskAttemptContext context) throws 
IOException {
+    PendingSet pendingSet = new PendingSet();
+    if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+      // load from memory
+      List<SinglePendingCommit> pendingCommits = 
loadPendingCommitsFromMemory(context);
+
+      for (SinglePendingCommit singleCommit : pendingCommits) {
+        // aggregate stats
+        pendingSet.getIOStatistics()
+            .aggregate(singleCommit.getIOStatistics());
+        // then clear so they aren't marshalled again.
+        singleCommit.getIOStatistics().clear();
+      }
+      pendingSet.setCommits(pendingCommits);
+    } else {
+      // Load from remote store
+      CommitOperations actions = getCommitOperations();
+      Path taskAttemptPath = getTaskAttemptPath(context);
+      try (CommitContext commitContext = initiateTaskOperation(context)) {
+        Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded =
+            actions.loadSinglePendingCommits(taskAttemptPath, true, 
commitContext);
+        pendingSet = loaded.getKey();
+        List<Pair<LocatedFileStatus, IOException>> failures = 
loaded.getValue();
+        if (!failures.isEmpty()) {
+          // At least one file failed to load
+          // revert all which did; report failure with first exception
+          LOG.error("At least one commit file could not be read: failing");
+          abortPendingUploads(commitContext, pendingSet.getCommits(), true);
+          throw failures.get(0).getValue();
+        }
+      }
+    }
+    return pendingSet;
+  }
+
+  private List<SinglePendingCommit> 
loadPendingCommitsFromMemory(TaskAttemptContext context)

Review Comment:
   nit, javadocs



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java:
##########
@@ -71,6 +79,26 @@ public void setup() throws Exception {
     CommitUtils.verifyIsMagicCommitFS(getFileSystem());
   }
 
+  @Parameterized.Parameters(name = "track-commit-in-memory-{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {false},
+        {true}
+    });
+  }
+
+  public ITestMagicCommitProtocol(boolean trackCommitsInMemory) {
+    this.trackCommitsInMemory = trackCommitsInMemory;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, 
trackCommitsInMemory);

Review Comment:
   look for uses of `removeBaseAndBucketOverrides()` to see how to avoid 
per-bucket settings breaking your test.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode)
   @Retries.RetryTranslated
   public FileStatus getFileStatus(final Path f) throws IOException {
     Path path = qualify(f);
+    if (isTrackMagicCommitsInMemoryEnabled(getConf()) && 
isMagicCommitPath(path)) {

Review Comment:
   this is a bit of a hack. not saying that's bad, just wondering if there is a 
more elegant solution.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import software.amazon.awssdk.services.s3.model.CompletedPart;

Review Comment:
   review import ordering and grouping. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java:
##########
@@ -264,9 +326,14 @@ public void abortTask(TaskAttemptContext context) throws 
IOException {
     try (DurationInfo d = new DurationInfo(LOG,
         "Abort task %s", context.getTaskAttemptID());
         CommitContext commitContext = initiateTaskOperation(context)) {
-      getCommitOperations().abortAllSinglePendingCommits(attemptPath,
-          commitContext,
-          true);
+      if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+        List<SinglePendingCommit> pendingCommits = 
loadPendingCommitsFromMemory(context);
+        for (SinglePendingCommit singleCommit : pendingCommits) {
+          commitContext.abortSingleCommit(singleCommit);
+        }
+      } else {
+        getCommitOperations().abortAllSinglePendingCommits(attemptPath, 
commitContext, true);

Review Comment:
   hmmm. this is trouble here as abortTask may be called from the job process 
rather than the task attempt. For example, TA considered failed; job instances 
calls abort. we can do that today because a list will find all pending uploads. 
with in memory, there's no longer that ability





> Support InMemory Tracking Of S3A Magic Commits
> ----------------------------------------------
>
>                 Key: HADOOP-19047
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19047
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/s3
>            Reporter: Syed Shameerur Rahman
>            Assignee: Syed Shameerur Rahman
>            Priority: Major
>              Labels: pull-request-available
>
> The following are the operations which happens within a Task when it uses S3A 
> Magic Committer. 
> *During closing of stream*
> 1. A 0-byte file with a same name of the original file is uploaded to S3 
> using PUT operation. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L152]
>  for more information. This is done so that the downstream application like 
> Spark could get the size of the file which is being written.
> 2. MultiPartUpload(MPU) metadata is uploaded to S3. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L176]
>  for more information.
> *During TaskCommit*
> 1. All the MPU metadata which the task wrote to S3 (There will be 'x' number 
> of metadata file in S3 if a single task writes to 'x' files) are read and 
> rewritten to S3 as a single metadata file. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java#L201]
>  for more information
> Since these operations happens with the Task JVM, We could optimize as well 
> as save cost by storing these information in memory when Task memory usage is 
> not a constraint. Hence the proposal here is to introduce a new MagicCommit 
> Tracker called "InMemoryMagicCommitTracker" which will store the 
> 1. Metadata of MPU in memory till the Task is committed
> 2. Store the size of the file which can be used by the downstream application 
> to get the file size before it is committed/visible to the output path.
> This optimization will save 2 PUT S3 calls, 1 LIST S3 call, and 1 GET S3 call 
> given a Task writes only 1 file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to