shameersss1 commented on PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#issuecomment-1926304528

   @steveloughran  - Thanks a lot a for a detailed review and some amazing 
question, The following are my thoughts on the different asks.
   
   > 1. Marker files at the end of each path so that spark status reporting on 
different processes can get an update on an active job.
   
   As far i know (Please correct me if i am wrong)
   1. 0-size marker files was specially added for Spark's use case.
   2. After writing files, Spark tries to get the size of the files written for 
the statistic purpose (like showing the output bytes written) in the Spark 
History server UI.
   3. This operation is being done as part of the 
[BasicWriteStatsTracker](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L86)
 class in Spark.
   4. As i could see in my experiment, BasicWriteStatsTracker#getFileSize is 
called in the executor process itself.
   
   That being said, Since the same process is calling 
BasicWriteStatsTracker#getFileSize is it still required to have 0 marker file? 
I have solved this by adding a check in FileStatus method by returing the file 
size corresponding to the magic path/file.
   
   ------
   
   > 2. A way to abort all uploads of a failed task attempt -even from a 
different process. Probably also a way to abort the entire job.
   
   Thinking from Spark's perspective,
   1. When a taskAttempt fails (gracefully), 
[abortTask](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L176)
 operation is called. This is operation is called within the same process and 
hence we can fetch the MPU metadata from the memory itself.
   
   2. If a taskAttempt fails (ungracefully and all retries) are exhausted, When 
[abortJob](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L69)
 operation is called which will internally invoke 
[cleanup](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java#L965)
 which lists all the pending multi part upload and aborts them.
   
   That being said, I am not sure if there is any such use case of abortingTask 
from another process. In such cases, The abortJob will handle it i guess.
   
   ----
   
   >3. Confidence that the inner memory store of pending uploads Will not grow 
it definitely.
   
   1. The static map entry is removed is during taskCommit or abortTask 
operations and hence it guaranteed that there is no memory leak (unless there 
is some unexplored corner case).
   
   2. The only case when it grows large, is when there are large number of 
concurrent jobs reusing the same executor JVM, Since we don't enable the 
"inmemory" by default we should be good. That being said, maybe we should call 
this out in the documentation.
   
   ----
   
   > 4. Two jobs writing to same path will it corrupt the Map ?
   
   The paths stored here as part of `private static Map<String, List<Path>> 
taskAttemptIdToPath = new ConcurrentHashMap<>();` is the magic path, Eventhough 
the file name might be same, The magic path for two different jobs will be 
different since the jobId is included in the path.
   
   
   Doe it make sense? Or am i missing anything?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to