steveloughran commented on a change in pull request #2399:
URL: https://github.com/apache/hadoop/pull/2399#discussion_r522961040
##########
File path:
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
##########
@@ -1430,6 +1450,255 @@ public void testParallelJobsToAdjacentPaths() throws
Throwable {
}
+
+ /**
+ * Run two jobs with the same destination and different output paths.
+ * <p></p>
+ * This only works if the jobs are set to NOT delete all outstanding
+ * uploads under the destination path.
+ * <p></p>
+ * See HADOOP-17318.
+ */
+ @Test
+ public void testParallelJobsToSameDestination() throws Throwable {
+
+ describe("Run two jobs to the same destination, assert they both
complete");
+ Configuration conf = getConfiguration();
+ conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
+
+ // this job has a job ID generated and set as the spark UUID;
+ // the config is also set to require it.
+ // This mimics the Spark setup process.
+
+ String stage1Id = UUID.randomUUID().toString();
+ conf.set(SPARK_WRITE_UUID, stage1Id);
+ conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+
+ // create the job and write data in its task attempt
+ JobData jobData = startJob(true);
+ Job job1 = jobData.job;
+ AbstractS3ACommitter committer1 = jobData.committer;
+ JobContext jContext1 = jobData.jContext;
+ TaskAttemptContext tContext1 = jobData.tContext;
+ Path job1TaskOutputFile = jobData.writtenTextPath;
+
+ // the write path
+ Assertions.assertThat(committer1.getWorkPath().toString())
+ .describedAs("Work path path of %s", committer1)
+ .contains(stage1Id);
+ // now build up a second job
+ String jobId2 = randomJobId();
+
+ // second job will use same ID
+ String attempt2 = taskAttempt0.toString();
+ TaskAttemptID taskAttempt2 = taskAttempt0;
+
+ // create the second job
+ Configuration c2 = unsetUUIDOptions(new JobConf(conf));
+ c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+ Job job2 = newJob(outDir,
+ c2,
+ attempt2);
+ Configuration conf2 = job2.getConfiguration();
+ conf2.set("mapreduce.output.basename", "task2");
+ String stage2Id = UUID.randomUUID().toString();
+ conf2.set(SPARK_WRITE_UUID,
+ stage2Id);
+
+ JobContext jContext2 = new JobContextImpl(conf2,
+ taskAttempt2.getJobID());
+ TaskAttemptContext tContext2 =
+ new TaskAttemptContextImpl(conf2, taskAttempt2);
+ AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
+ Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
+ .describedAs("Job attempt path of %s", committer2)
+ .isNotEqualTo(committer1.getJobAttemptPath(jContext1));
+ Assertions.assertThat(committer2.getTaskAttemptPath(tContext2))
+ .describedAs("Task attempt path of %s", committer2)
+ .isNotEqualTo(committer1.getTaskAttemptPath(tContext1));
+ Assertions.assertThat(committer2.getWorkPath().toString())
+ .describedAs("Work path path of %s", committer2)
+ .isNotEqualTo(committer1.getWorkPath().toString())
+ .contains(stage2Id);
+ Assertions.assertThat(committer2.getUUIDSource())
+ .describedAs("UUID source of %s", committer2)
+ .isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID);
+ JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
+ setup(jobData2);
+ abortInTeardown(jobData2);
+
+ // the sequence is designed to ensure that job2 has active multipart
+ // uploads during/after job1's work
+
+ // if the committer is a magic committer, MPUs start in the write,
+ // otherwise in task commit.
+ boolean multipartInitiatedInWrite =
+ committer2 instanceof MagicS3GuardCommitter;
+
+ // job2. Here we start writing a file and have that write in progress
+ // when job 1 commits.
+
+ LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
+ recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter(
+ tContext2);
+
+ LOG.info("Commit Task 1");
+ commitTask(committer1, tContext1);
+
+ if (multipartInitiatedInWrite) {
+ // magic committer runs -commit job1 while a job2 TA has an open
+ // writer (and hence: open MP Upload)
+ LOG.info("Commit Job 1");
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]