[
https://issues.apache.org/jira/browse/HADOOP-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814175#comment-17814175
]
ASF GitHub Bot commented on HADOOP-19047:
-----------------------------------------
shameersss1 commented on PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#issuecomment-1926304528
@steveloughran - Thanks a lot a for a detailed review and some amazing
question, The following are my thoughts on the different asks.
> 1. Marker files at the end of each path so that spark status reporting on
different processes can get an update on an active job.
As far i know (Please correct me if i am wrong)
1. 0-size marker files was specially added for Spark's use case.
2. After writing files, Spark tries to get the size of the files written for
the statistic purpose (like showing the output bytes written) in the Spark
History server UI.
3. This operation is being done as part of the
[BasicWriteStatsTracker](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L86)
class in Spark.
4. As i could see in my experiment, BasicWriteStatsTracker#getFileSize is
called in the executor process itself.
That being said, Since the same process is calling
BasicWriteStatsTracker#getFileSize is it still required to have 0 marker file?
I have solved this by adding a check in FileStatus method by returing the file
size corresponding to the magic path/file.
------
> 2. A way to abort all uploads of a failed task attempt -even from a
different process. Probably also a way to abort the entire job.
Thinking from Spark's perspective,
1. When a taskAttempt fails (gracefully),
[abortTask](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L176)
operation is called. This is operation is called within the same process and
hence we can fetch the MPU metadata from the memory itself.
2. If a taskAttempt fails (ungracefully and all retries) are exhausted, When
[abortJob](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L69)
operation is called which will internally invoke
[cleanup](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java#L965)
which lists all the pending multi part upload and aborts them.
That being said, I am not sure if there is any such use case of abortingTask
from another process. In such cases, The abortJob will handle it i guess.
----
>3. Confidence that the inner memory store of pending uploads Will not grow
it definitely.
1. The static map entry is removed is during taskCommit or abortTask
operations and hence it guaranteed that there is no memory leak (unless there
is some unexplored corner case).
2. The only case when it grows large, is when there are large number of
concurrent jobs reusing the same executor JVM, Since we don't enable the
"inmemory" by default we should be good. That being said, maybe we should call
this out in the documentation.
----
> 4. Two jobs writing to same path will it corrupt the Map ?
The paths stored here as part of `private static Map<String, List<Path>>
taskAttemptIdToPath = new ConcurrentHashMap<>();` is the magic path, Eventhough
the file name might be same, The magic path for two different jobs will be
different since the jobId is included in the path.
Doe it make sense? Or am i missing anything?
> Support InMemory Tracking Of S3A Magic Commits
> ----------------------------------------------
>
> Key: HADOOP-19047
> URL: https://issues.apache.org/jira/browse/HADOOP-19047
> Project: Hadoop Common
> Issue Type: Improvement
> Components: fs/s3
> Reporter: Syed Shameerur Rahman
> Assignee: Syed Shameerur Rahman
> Priority: Major
> Labels: pull-request-available
>
> The following are the operations which happens within a Task when it uses S3A
> Magic Committer.
> *During closing of stream*
> 1. A 0-byte file with a same name of the original file is uploaded to S3
> using PUT operation. Refer
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L152]
> for more information. This is done so that the downstream application like
> Spark could get the size of the file which is being written.
> 2. MultiPartUpload(MPU) metadata is uploaded to S3. Refer
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L176]
> for more information.
> *During TaskCommit*
> 1. All the MPU metadata which the task wrote to S3 (There will be 'x' number
> of metadata file in S3 if a single task writes to 'x' files) are read and
> rewritten to S3 as a single metadata file. Refer
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java#L201]
> for more information
> Since these operations happens with the Task JVM, We could optimize as well
> as save cost by storing these information in memory when Task memory usage is
> not a constraint. Hence the proposal here is to introduce a new MagicCommit
> Tracker called "InMemoryMagicCommitTracker" which will store the
> 1. Metadata of MPU in memory till the Task is committed
> 2. Store the size of the file which can be used by the downstream application
> to get the file size before it is committed/visible to the output path.
> This optimization will save 2 PUT S3 calls, 1 LIST S3 call, and 1 GET S3 call
> given a Task writes only 1 file.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]