liuml07 commented on a change in pull request #2399:
URL: https://github.com/apache/hadoop/pull/2399#discussion_r521130866



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
##########
@@ -585,7 +589,8 @@ public BulkOperationState initiateOperation(final Path path,
   @Retries.RetryTranslated
   public UploadPartResult uploadPart(UploadPartRequest request)
       throws IOException {
-    return retry("upload part",
+    return retry("upload part #" + request.getPartNumber()
+        + " upload "+ request.getUploadId(),

Review comment:
       nit: s/upload/upload ID/
   
   I was thinking of consistent log keywords so taht for any retry log we can 
search "upload ID" or "commit ID"

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
##########
@@ -131,6 +131,8 @@ protected WriteOperationHelper(S3AFileSystem owner, 
Configuration conf) {
    */
   void operationRetried(String text, Exception ex, int retries,
       boolean idempotent) {
+    LOG.info("{}: Retried {}: {}", retries, text, ex.toString());

Review comment:
       the order of parameter is wrong.

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
##########
@@ -1430,6 +1450,255 @@ public void testParallelJobsToAdjacentPaths() throws 
Throwable {
 
   }
 
+
+  /**
+   * Run two jobs with the same destination and different output paths.
+   * <p></p>
+   * This only works if the jobs are set to NOT delete all outstanding
+   * uploads under the destination path.
+   * <p></p>
+   * See HADOOP-17318.
+   */
+  @Test
+  public void testParallelJobsToSameDestination() throws Throwable {
+
+    describe("Run two jobs to the same destination, assert they both 
complete");
+    Configuration conf = getConfiguration();
+    conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
+
+    // this job has a job ID generated and set as the spark UUID;
+    // the config is also set to require it.
+    // This mimics the Spark setup process.
+
+    String stage1Id = UUID.randomUUID().toString();
+    conf.set(SPARK_WRITE_UUID, stage1Id);
+    conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+
+    // create the job and write data in its task attempt
+    JobData jobData = startJob(true);
+    Job job1 = jobData.job;
+    AbstractS3ACommitter committer1 = jobData.committer;
+    JobContext jContext1 = jobData.jContext;
+    TaskAttemptContext tContext1 = jobData.tContext;
+    Path job1TaskOutputFile = jobData.writtenTextPath;
+
+    // the write path
+    Assertions.assertThat(committer1.getWorkPath().toString())
+        .describedAs("Work path path of %s", committer1)
+        .contains(stage1Id);
+    // now build up a second job
+    String jobId2 = randomJobId();
+
+    // second job will use same ID
+    String attempt2 = taskAttempt0.toString();
+    TaskAttemptID taskAttempt2 = taskAttempt0;
+
+    // create the second job
+    Configuration c2 = unsetUUIDOptions(new JobConf(conf));
+    c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    Job job2 = newJob(outDir,
+        c2,
+        attempt2);
+    Configuration conf2 = job2.getConfiguration();

Review comment:
       nit: may call this `conf2` like `jobConf2` to make it a bit clearer.

##########
File path: 
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
##########
@@ -535,20 +535,28 @@ Conflict management is left to the execution engine 
itself.
 
 | Option | Magic | Directory | Partitioned | Meaning | Default |
 |--------|-------|-----------|-------------|---------|---------|
-| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a 
`_SUCCESS` file  at the end of each job | `true` |
+| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a 
`_SUCCESS` file on the successful completion of the job. | `true` |
+| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being 
written and/or staged. | `${hadoop.tmp.dir}/s3a` |
+| `fs.s3a.committer.magic.enabled` | X |  | | Enable "magic committer" support 
in the filesystem. | `false` |
+| `fs.s3a.committer.abort.pending.uploads` | X | X | X | list and abort all 
pending uploads under the destination path when the job is committed or 
aborted. | `true` |
 | `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for 
parallel operations on files. | 8 |
-| `fs.s3a.committer.staging.conflict-mode` |  | X | X | Conflict resolution: 
`fail`, `append` or `replace`| `append` |
-| `fs.s3a.committer.staging.unique-filenames` |  | X | X | Generate unique 
filenames | `true` |
-| `fs.s3a.committer.magic.enabled` | X |  | | Enable "magic committer" support 
in the filesystem | `false` |
+| `fs.s3a.committer.generate.uuid` |  | X | X | Generate a Job UUID if none is 
passed down from Spark | `false` |
+| `fs.s3a.committer.require.uuid` |  | X | X | Require the Job UUID to be 
passed down from Spark | `false` |
 
 
+Staging committer (Directory and Partitioned) options
 
 
 | Option | Magic | Directory | Partitioned | Meaning | Default |
 |--------|-------|-----------|-------------|---------|---------|
-| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being 
written and/or staged. | |
-| `fs.s3a.committer.staging.tmp.path` |  | X | X | Path in the cluster 
filesystem for temporary data | `tmp/staging` |
 
+| `fs.s3a.committer.staging.conflict-mode` |  | X | X | Conflict resolution: 
`fail`, `append` or `replace`| `append` |

Review comment:
       There is an empty line between the table header and this first row. I 
see github online viewer is not blessing this. Maybe we just remove LoC 552

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
##########
@@ -118,15 +114,14 @@ public StagingCommitter(Path outputPath,
     Configuration conf = getConf();
     this.uploadPartSize = conf.getLongBytes(
         MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    this.uuid = getUploadUUID(conf, context.getJobID());
     this.uniqueFilenames = conf.getBoolean(
         FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
         DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
-    setWorkPath(buildWorkPath(context, uuid));
+    setWorkPath(buildWorkPath(context, this.getUUID()));

Review comment:
       nit: just `getUUID()` without `this.`?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
##########
@@ -264,4 +283,25 @@ private CommitConstants() {
   /** Extra Data key for task attempt in pendingset files. */
   public static final String TASK_ATTEMPT_ID = "task.attempt.id";
 
+  /**
+   * Require the spark UUID to be passed down: {@value}.
+   * This is to verify that SPARK-33230 has been applied to spark, and that
+   * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is set.
+   * <p>
+   *   MUST ONLY BE SET WITH SPARK JOBS.
+   * </p>
+   */

Review comment:
       nit: We can make it clear in javadoc here that default value is false. 
Same the generate.uuid below.

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
##########
@@ -1430,6 +1450,255 @@ public void testParallelJobsToAdjacentPaths() throws 
Throwable {
 
   }
 
+
+  /**
+   * Run two jobs with the same destination and different output paths.
+   * <p></p>
+   * This only works if the jobs are set to NOT delete all outstanding
+   * uploads under the destination path.
+   * <p></p>
+   * See HADOOP-17318.
+   */
+  @Test
+  public void testParallelJobsToSameDestination() throws Throwable {
+
+    describe("Run two jobs to the same destination, assert they both 
complete");
+    Configuration conf = getConfiguration();
+    conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
+
+    // this job has a job ID generated and set as the spark UUID;
+    // the config is also set to require it.
+    // This mimics the Spark setup process.
+
+    String stage1Id = UUID.randomUUID().toString();
+    conf.set(SPARK_WRITE_UUID, stage1Id);
+    conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+
+    // create the job and write data in its task attempt
+    JobData jobData = startJob(true);
+    Job job1 = jobData.job;
+    AbstractS3ACommitter committer1 = jobData.committer;
+    JobContext jContext1 = jobData.jContext;
+    TaskAttemptContext tContext1 = jobData.tContext;
+    Path job1TaskOutputFile = jobData.writtenTextPath;
+
+    // the write path
+    Assertions.assertThat(committer1.getWorkPath().toString())
+        .describedAs("Work path path of %s", committer1)
+        .contains(stage1Id);
+    // now build up a second job
+    String jobId2 = randomJobId();
+
+    // second job will use same ID
+    String attempt2 = taskAttempt0.toString();
+    TaskAttemptID taskAttempt2 = taskAttempt0;
+
+    // create the second job
+    Configuration c2 = unsetUUIDOptions(new JobConf(conf));
+    c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    Job job2 = newJob(outDir,
+        c2,
+        attempt2);
+    Configuration conf2 = job2.getConfiguration();
+    conf2.set("mapreduce.output.basename", "task2");
+    String stage2Id = UUID.randomUUID().toString();
+    conf2.set(SPARK_WRITE_UUID,
+        stage2Id);
+
+    JobContext jContext2 = new JobContextImpl(conf2,
+        taskAttempt2.getJobID());
+    TaskAttemptContext tContext2 =
+        new TaskAttemptContextImpl(conf2, taskAttempt2);
+    AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
+    Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
+        .describedAs("Job attempt path of %s", committer2)
+        .isNotEqualTo(committer1.getJobAttemptPath(jContext1));
+    Assertions.assertThat(committer2.getTaskAttemptPath(tContext2))
+        .describedAs("Task attempt path of %s", committer2)
+        .isNotEqualTo(committer1.getTaskAttemptPath(tContext1));
+    Assertions.assertThat(committer2.getWorkPath().toString())
+        .describedAs("Work path path of %s", committer2)
+        .isNotEqualTo(committer1.getWorkPath().toString())
+        .contains(stage2Id);
+    Assertions.assertThat(committer2.getUUIDSource())
+        .describedAs("UUID source of %s", committer2)
+        .isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID);
+    JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
+    setup(jobData2);
+    abortInTeardown(jobData2);
+
+    // the sequence is designed to ensure that job2 has active multipart
+    // uploads during/after job1's work
+
+    // if the committer is a magic committer, MPUs start in the write,
+    // otherwise in task commit.
+    boolean multipartInitiatedInWrite =
+        committer2 instanceof MagicS3GuardCommitter;
+
+    // job2. Here we start writing a file and have that write in progress
+    // when job 1 commits.
+
+    LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
+        recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter(
+            tContext2);
+
+    LOG.info("Commit Task 1");
+    commitTask(committer1, tContext1);
+
+    if (multipartInitiatedInWrite) {
+      // magic committer runs -commit job1 while a job2 TA has an open
+      // writer (and hence: open MP Upload)
+      LOG.info("Commit Job 1");

Review comment:
       nit: add `multipartInitiatedInWrite` to the log message? Same below

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
##########
@@ -97,4 +97,30 @@ private InternalCommitterConstants() {
   /** Error message for a path without a magic element in the list: {@value}. 
*/
   public static final String E_NO_MAGIC_PATH_ELEMENT
       = "No " + MAGIC + " element in path";
+
+  /**
+   * The UUID for jobs: {@value}.
+   * This was historically created in Spark 1.x's SQL queries, but "went away".
+   */
+  public static final String SPARK_WRITE_UUID =
+      "spark.sql.sources.writeJobUUID";
+
+  /**
+   * The App ID for jobs: {@value}.
+   */
+  public static final String SPARK_APP_ID = "spark.app.id";

Review comment:
       Is this SPARK app ID name constant still used, or I missed something? 🤔 

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
##########
@@ -175,48 +171,17 @@ protected void initFileOutputCommitterOptions(JobContext 
context) {
   public String toString() {
     final StringBuilder sb = new StringBuilder("StagingCommitter{");
     sb.append(super.toString());
+    sb.append(", commitsDirectory=").append(commitsDirectory);
+    sb.append(", uniqueFilenames=").append(uniqueFilenames);
     sb.append(", conflictResolution=").append(conflictResolution);
+    sb.append(". uploadPartSize=").append(uploadPartSize);

Review comment:
       nit: s/./,

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
##########
@@ -68,7 +68,7 @@
   /**
    * Serialization ID: {@value}.
    */
-  private static final long serialVersionUID = 507133045258460084L;
+  private static final long serialVersionUID = 507133045258460083L + VERSION;

Review comment:
       😄 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to