[ 
https://issues.apache.org/jira/browse/HADOOP-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809158#comment-17809158
 ] 

ASF GitHub Bot commented on HADOOP-19047:
-----------------------------------------

steveloughran commented on code in PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#discussion_r1461054737


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3ADataBlocks;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.util.Preconditions;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+
+public class S3MagicCommitTracker extends MagicCommitTracker {
+
+  public S3MagicCommitTracker(Path path,
+      String bucket,
+      String originalDestKey,
+      String destKey,
+      String pendingsetKey,
+      WriteOperationHelper writer,
+      PutTrackerStatistics trackerStatistics) {
+    super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, 
trackerStatistics);
+  }
+
+  @Override
+  public boolean aboutToComplete(String uploadId,
+      List<CompletedPart> parts,
+      long bytesWritten,
+      final IOStatistics iostatistics)
+      throws IOException {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
+        "empty/null upload ID: "+ uploadId);
+    Preconditions.checkArgument(parts != null,
+        "No uploaded parts list");
+    Preconditions.checkArgument(!parts.isEmpty(),
+        "No uploaded parts to save");
+
+    // put a 0-byte file with the name of the original under-magic path
+    // Add the final file length as a header
+    // this is done before the task commit, so its duration can be
+    // included in the statistics
+    Map<String, String> headers = new HashMap<>();
+    headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
+    PutObjectRequest originalDestPut = writer.createPutObjectRequest(
+        originalDestKey,
+        0,
+        new PutObjectOptions(true, null, headers), false);
+    upload(originalDestPut, new ByteArrayInputStream(EMPTY));
+
+    // build the commit summary
+    SinglePendingCommit commitData = new SinglePendingCommit();
+    commitData.touch(System.currentTimeMillis());
+    commitData.setDestinationKey(getDestKey());
+    commitData.setBucket(bucket);
+    commitData.setUri(path.toUri().toString());
+    commitData.setUploadId(uploadId);
+    commitData.setText("");
+    commitData.setLength(bytesWritten);
+    commitData.bindCommitData(parts);
+    commitData.setIOStatistics(
+        new IOStatisticsSnapshot(iostatistics));
+
+    byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());

Review Comment:
   you know, the other thing to consider here is moving from json 
serialization; IOStatisticsSnapshot already implements Serializable; adding 
Hadoop Writable to it would make for faster ser/deser and marshalling than 
through jackson





> Support InMemory Tracking Of S3A Magic Commits
> ----------------------------------------------
>
>                 Key: HADOOP-19047
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19047
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/s3
>            Reporter: Syed Shameerur Rahman
>            Assignee: Syed Shameerur Rahman
>            Priority: Major
>              Labels: pull-request-available
>
> The following are the operations which happens within a Task when it uses S3A 
> Magic Committer. 
> *During closing of stream*
> 1. A 0-byte file with a same name of the original file is uploaded to S3 
> using PUT operation. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L152]
>  for more information. This is done so that the downstream application like 
> Spark could get the size of the file which is being written.
> 2. MultiPartUpload(MPU) metadata is uploaded to S3. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L176]
>  for more information.
> *During TaskCommit*
> 1. All the MPU metadata which the task wrote to S3 (There will be 'x' number 
> of metadata file in S3 if a single task writes to 'x' files) are read and 
> rewritten to S3 as a single metadata file. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java#L201]
>  for more information
> Since these operations happens with the Task JVM, We could optimize as well 
> as save cost by storing these information in memory when Task memory usage is 
> not a constraint. Hence the proposal here is to introduce a new MagicCommit 
> Tracker called "InMemoryMagicCommitTracker" which will store the 
> 1. Metadata of MPU in memory till the Task is committed
> 2. Store the size of the file which can be used by the downstream application 
> to get the file size before it is committed/visible to the output path.
> This optimization will save 2 PUT S3 calls, 1 LIST S3 call, and 1 GET S3 call 
> given a Task writes only 1 file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to