[
https://issues.apache.org/jira/browse/HADOOP-18797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764341#comment-17764341
]
ASF GitHub Bot commented on HADOOP-18797:
-----------------------------------------
steveloughran commented on code in PR #6006:
URL: https://github.com/apache/hadoop/pull/6006#discussion_r1323354035
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java:
##########
@@ -96,7 +101,18 @@ public static boolean containsBasePath(List<String>
elements) {
* @throws IllegalArgumentException if there is no magic element
*/
public static int magicElementIndex(List<String> elements) {
- int index = elements.indexOf(MAGIC);
+ int index = 0;
Review Comment:
actually, you could coalesce this with a L114-115 some method to get the
index without raising an exception; isMagicPath(elements) calls this and
returns true if the index > 0. or you do very fancy java8 Optional stuff...
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java:
##########
@@ -208,8 +215,8 @@ public static List<String> finalDestination(List<String>
elements) {
if (isMagicPath(elements)) {
List<String> destDir = magicPathParents(elements);
List<String> children = magicPathChildren(elements);
- checkArgument(!children.isEmpty(), "No path found under " +
- MAGIC);
+ checkArgument(!children.isEmpty(), "No path found under the prefix" +
Review Comment:
needs a space at the end
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java:
##########
@@ -586,7 +586,7 @@ private MagicCommitterTestBinding() {
protected void validateResult(final Path destPath,
final SuccessData successData)
throws Exception {
- Path magicDir = new Path(destPath, MAGIC);
+ Path magicDir = new Path(destPath, MAGIC_PATH_PREFIX);
Review Comment:
is a job id getting down? it should be, for completeness
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java:
##########
@@ -76,7 +76,12 @@ public static List<String> splitPathToElements(Path path) {
* @return true if a path is considered magic
*/
public static boolean isMagicPath(List<String> elements) {
- return elements.contains(MAGIC);
+ for (String element : elements) {
Review Comment:
or use some java8 stream thing?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java:
##########
@@ -151,10 +151,10 @@ public void testMagicMkdir() throws Throwable {
*/
@Test
public void testMagicMkdirs() throws Throwable {
- describe("Mkdirs __magic/subdir always skips dir marker deletion");
+ describe("Mkdirs __magic_job-/subdir always skips dir marker deletion");
S3AFileSystem fs = getFileSystem();
Path baseDir = methodPath();
- Path magicDir = new Path(baseDir, MAGIC);
+ Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX);
Review Comment:
for this test use the base prefix so we can see that it is still recognised
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java:
##########
@@ -131,16 +134,19 @@ protected ActiveCommit listPendingUploadsToCommit(
* Delete the magic directory.
*/
public void cleanupStagingDirs() {
- final Path out = getOutputPath();
- Path path = magicSubdir(out);
- try(DurationInfo ignored = new DurationInfo(LOG, true,
- "Deleting magic directory %s", path)) {
- Invoker.ignoreIOExceptions(LOG, "cleanup magic directory",
path.toString(),
- () -> deleteWithWarning(getDestFS(), path, true));
- // and the job temp directory with manifests
- Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
- () -> deleteWithWarning(getDestFS(),
- new Path(out, TEMP_DATA), true));
+ if (cleanupMagicDirectory) {
+ final Path out = getOutputPath();
+ Path path = getMagicJobPath(getUUID(), out);
+ try (DurationInfo ignored = new DurationInfo(LOG, true,
+ "Deleting magic directory %s", path)) {
+ Invoker.ignoreIOExceptions(LOG, "cleanup magic directory",
path.toString(),
+ () -> deleteWithWarning(getDestFS(), path, true));
+ // and the job temp directory with manifests
+ Invoker.ignoreIOExceptions(LOG, "cleanup job directory",
path.toString(),
Review Comment:
job dir cleanup MUST always happen; make the root path cleanup the optional
one
> S3A committer fix lost data on concurrent jobs
> ----------------------------------------------
>
> Key: HADOOP-18797
> URL: https://issues.apache.org/jira/browse/HADOOP-18797
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/s3
> Reporter: Emanuel Velzi
> Assignee: Syed Shameerur Rahman
> Priority: Major
> Labels: pull-request-available
>
> There is a failure in the commit process when multiple jobs are writing to a
> s3 directory *concurrently* using {*}magic committers{*}.
> This issue is closely related HADOOP-17318.
> When multiple Spark jobs write to the same S3A directory, they upload files
> simultaneously using "__magic" as the base directory for staging. Inside this
> directory, there are multiple "/job-some-uuid" directories, each representing
> a concurrently running job.
> To fix some preoblems related to concunrrency a property was introduced in
> the previous fix: "spark.hadoop.fs.s3a.committer.abort.pending.uploads". When
> set to false, it ensures that during the cleanup stage, finalizing jobs do
> not abort pending uploads from other jobs. So we see in logs this line:
> {code:java}
> DEBUG [main] o.a.h.fs.s3a.commit.AbstractS3ACommitter (819): Not cleanup up
> pending uploads to s3a ...{code}
> (from
> [AbstractS3ACommitter.java#L952|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java#L952])
> However, in the next step, the {*}"__magic" directory is recursively
> deleted{*}:
> {code:java}
> INFO [main] o.a.h.fs.s3a.commit.magic.MagicS3GuardCommitter (98): Deleting
> magic directory s3a://my-bucket/my-table/__magic: duration 0:00.560s {code}
> (from [AbstractS3ACommitter.java#L1112
> |https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java#L1112]and
>
> [MagicS3GuardCommitter.java#L137)|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java#L137)]
> This deletion operation *affects the second job* that is still running
> because it loses pending uploads (i.e., ".pendingset" and ".pending" files).
> The consequences can range from an exception in the best case to a silent
> loss of data in the worst case. The latter occurs when Job_1 deletes files
> just before Job_2 executes "listPendingUploadsToCommit" to list ".pendingset"
> files in the job attempt directory previous to complete the uploads with POST
> requests.
> To resolve this issue, it's important {*}to ensure that only the prefix
> associated with the job currently finalizing is cleaned{*}.
> Here's a possible solution:
> {code:java}
> /**
> * Delete the magic directory.
> */
> public void cleanupStagingDirs() {
> final Path out = getOutputPath();
> //Path path = magicSubdir(getOutputPath());
> Path path = new Path(magicSubdir(out), formatJobDir(getUUID()));
> try(DurationInfo ignored = new DurationInfo(LOG, true,
> "Deleting magic directory %s", path)) {
> Invoker.ignoreIOExceptions(LOG, "cleanup magic directory",
> path.toString(),
> () -> deleteWithWarning(getDestFS(), path, true));
> }
> } {code}
>
> The side effect of this issue is that the "__magic" directory is never
> cleaned up. However, I believe this is a minor concern, even considering that
> other folders such as "_SUCCESS" also persist after jobs end.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]