[
https://issues.apache.org/jira/browse/HADOOP-17318?focusedWorklogId=511370&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-511370
]
ASF GitHub Bot logged work on HADOOP-17318:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Nov/20 13:49
Start Date: 13/Nov/20 13:49
Worklog Time Spent: 10m
Work Description: steveloughran commented on a change in pull request
#2399:
URL: https://github.com/apache/hadoop/pull/2399#discussion_r522961040
##########
File path:
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
##########
@@ -1430,6 +1450,255 @@ public void testParallelJobsToAdjacentPaths() throws
Throwable {
}
+
+ /**
+ * Run two jobs with the same destination and different output paths.
+ * <p></p>
+ * This only works if the jobs are set to NOT delete all outstanding
+ * uploads under the destination path.
+ * <p></p>
+ * See HADOOP-17318.
+ */
+ @Test
+ public void testParallelJobsToSameDestination() throws Throwable {
+
+ describe("Run two jobs to the same destination, assert they both
complete");
+ Configuration conf = getConfiguration();
+ conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
+
+ // this job has a job ID generated and set as the spark UUID;
+ // the config is also set to require it.
+ // This mimics the Spark setup process.
+
+ String stage1Id = UUID.randomUUID().toString();
+ conf.set(SPARK_WRITE_UUID, stage1Id);
+ conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+
+ // create the job and write data in its task attempt
+ JobData jobData = startJob(true);
+ Job job1 = jobData.job;
+ AbstractS3ACommitter committer1 = jobData.committer;
+ JobContext jContext1 = jobData.jContext;
+ TaskAttemptContext tContext1 = jobData.tContext;
+ Path job1TaskOutputFile = jobData.writtenTextPath;
+
+ // the write path
+ Assertions.assertThat(committer1.getWorkPath().toString())
+ .describedAs("Work path path of %s", committer1)
+ .contains(stage1Id);
+ // now build up a second job
+ String jobId2 = randomJobId();
+
+ // second job will use same ID
+ String attempt2 = taskAttempt0.toString();
+ TaskAttemptID taskAttempt2 = taskAttempt0;
+
+ // create the second job
+ Configuration c2 = unsetUUIDOptions(new JobConf(conf));
+ c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+ Job job2 = newJob(outDir,
+ c2,
+ attempt2);
+ Configuration conf2 = job2.getConfiguration();
+ conf2.set("mapreduce.output.basename", "task2");
+ String stage2Id = UUID.randomUUID().toString();
+ conf2.set(SPARK_WRITE_UUID,
+ stage2Id);
+
+ JobContext jContext2 = new JobContextImpl(conf2,
+ taskAttempt2.getJobID());
+ TaskAttemptContext tContext2 =
+ new TaskAttemptContextImpl(conf2, taskAttempt2);
+ AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
+ Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
+ .describedAs("Job attempt path of %s", committer2)
+ .isNotEqualTo(committer1.getJobAttemptPath(jContext1));
+ Assertions.assertThat(committer2.getTaskAttemptPath(tContext2))
+ .describedAs("Task attempt path of %s", committer2)
+ .isNotEqualTo(committer1.getTaskAttemptPath(tContext1));
+ Assertions.assertThat(committer2.getWorkPath().toString())
+ .describedAs("Work path path of %s", committer2)
+ .isNotEqualTo(committer1.getWorkPath().toString())
+ .contains(stage2Id);
+ Assertions.assertThat(committer2.getUUIDSource())
+ .describedAs("UUID source of %s", committer2)
+ .isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID);
+ JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
+ setup(jobData2);
+ abortInTeardown(jobData2);
+
+ // the sequence is designed to ensure that job2 has active multipart
+ // uploads during/after job1's work
+
+ // if the committer is a magic committer, MPUs start in the write,
+ // otherwise in task commit.
+ boolean multipartInitiatedInWrite =
+ committer2 instanceof MagicS3GuardCommitter;
+
+ // job2. Here we start writing a file and have that write in progress
+ // when job 1 commits.
+
+ LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
+ recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter(
+ tContext2);
+
+ LOG.info("Commit Task 1");
+ commitTask(committer1, tContext1);
+
+ if (multipartInitiatedInWrite) {
+ // magic committer runs -commit job1 while a job2 TA has an open
+ // writer (and hence: open MP Upload)
+ LOG.info("Commit Job 1");
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 511370)
Time Spent: 6h 40m (was: 6.5h)
> S3A committer to support concurrent jobs with same app attempt ID & dest dir
> ----------------------------------------------------------------------------
>
> Key: HADOOP-17318
> URL: https://issues.apache.org/jira/browse/HADOOP-17318
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.3.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 6h 40m
> Remaining Estimate: 0h
>
> Reported failure of magic committer block uploads as pending upload ID is
> unknown. Likely cause: it's been aborted by another job
> # Make it possible to turn off cleanup of pending uploads in magic committer
> # log more about uploads being deleted in committers
> # and upload ID in the S3aBlockOutputStream errors
> There are other concurrency issues when you look close, see SPARK-33230
> * magic committer uses app attempt ID as path under __magic; if there are
> duplicate then they will conflict
> * staging committer local temp dir uses app attempt id
> Fix will be to have a job UUID which for spark will be picked up from the
> SPARK-33230 changes, (option to self-generate in job setup for hadoop 3.3.1+
> older spark builds); fall back to app-attempt *unless that fallback has been
> disabled*
> MR: configure to use app attempt ID
> Spark: configure to fail job setup if app attempt ID is the source of a job
> uuid
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]