[ 
https://issues.apache.org/jira/browse/HADOOP-17318?focusedWorklogId=510916&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-510916
 ]

ASF GitHub Bot logged work on HADOOP-17318:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Nov/20 17:27
            Start Date: 12/Nov/20 17:27
    Worklog Time Spent: 10m 
      Work Description: rdblue commented on a change in pull request #2399:
URL: https://github.com/apache/hadoop/pull/2399#discussion_r522277893



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -147,6 +173,11 @@ protected AbstractS3ACommitter(
     this.jobContext = context;
     this.role = "Task committer " + context.getTaskAttemptID();
     setConf(context.getConfiguration());
+    Pair<String, JobUUIDSource> id = buildJobUUID(
+        conf, context.getJobID());
+    uuid = id.getLeft();
+    uuidSource = id.getRight();

Review comment:
       Other places use `this.` as a prefix when setting fields. I find that 
helpful when reading to know that an instance field is being set, vs a local 
variable.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
##########
@@ -1925,20 +1925,13 @@
 </property>
 
 <property>
-  <name>fs.s3a.committer.staging.abort.pending.uploads</name>
+  <name>fs.s3a.committer.abort.pending.uploads</name>
   <value>true</value>
   <description>
-    Should the staging committers abort all pending uploads to the destination
+    Should the committers abort all pending uploads to the destination
     directory?
 
-    Changing this if more than one partitioned committer is
-    writing to the same destination tree simultaneously; otherwise
-    the first job to complete will cancel all outstanding uploads from the
-    others. However, it may lead to leaked outstanding uploads from failed
-    tasks. If disabled, configure the bucket lifecycle to remove uploads
-    after a time period, and/or set up a workflow to explicitly delete
-    entries. Otherwise there is a risk that uncommitted uploads may run up
-    bills.
+    Set to false if more than one job is writing to the same directory tree.

Review comment:
       Committers don't cancel just their own pending uploads?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -411,26 +464,63 @@ protected void maybeCreateSuccessMarker(JobContext 
context,
    * be deleted; creating it now ensures there is something at the end
    * while the job is in progress -and if nothing is created, that
    * it is still there.
+   * <p>
+   *   The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}
+   *   is set to the job UUID; if generated locally
+   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched.
+   *   The field {@link #jobSetup} is set to true to note that
+   *   this specific committer instance was used to set up a job.
+   * </p>
    * @param context context
    * @throws IOException IO failure
    */
 
   @Override
   public void setupJob(JobContext context) throws IOException {
-    try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Job %s setting up", getUUID())) {
+      // record that the job has been set up
+      jobSetup = true;
+      // patch job conf with the job UUID.
+      Configuration c = context.getConfiguration();
+      c.set(FS_S3A_COMMITTER_UUID, this.getUUID());
+      if (getUUIDSource() == JobUUIDSource.GeneratedLocally) {
+        // we set the UUID up locally. Save it back to the job configuration
+        c.set(SPARK_WRITE_UUID, this.getUUID());

Review comment:
       It seems odd to set the Spark property. Does anything else use this?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -1044,6 +1166,155 @@ protected void abortPendingUploads(
     }
   }
 
+  /**
+   * Scan for active uploads and list them along with a warning message.
+   * Errors are ignored.
+   * @param path output path of job.
+   */
+  protected void warnOnActiveUploads(final Path path) {
+    List<MultipartUpload> pending;
+    try {
+      pending = getCommitOperations()
+          .listPendingUploadsUnderPath(path);
+    } catch (IOException e) {
+      LOG.debug("Failed to list uploads under {}",
+          path, e);
+      return;
+    }
+    if (!pending.isEmpty()) {
+      // log a warning
+      LOG.warn("{} active upload(s) in progress under {}",
+          pending.size(),
+          path);
+      LOG.warn("Either jobs are running concurrently"
+          + " or failed jobs are not being cleaned up");
+      // and the paths + timestamps
+      DateFormat df = DateFormat.getDateTimeInstance();
+      pending.forEach(u ->
+          LOG.info("[{}] {}",
+              df.format(u.getInitiated()),
+              u.getKey()));
+      if (shouldAbortUploadsInCleanup()) {
+        LOG.warn("This committer will abort these uploads in job cleanup");
+      }
+    }
+  }
+
+  /**
+   * Build the job UUID.
+   *
+   * <p>
+   *  In MapReduce jobs, the application ID is issued by YARN, and
+   *  unique across all jobs.
+   * </p>
+   * <p>
+   * Spark will use a fake app ID based on the current time.
+   * This can lead to collisions on busy clusters.
+   *
+   * </p>
+   * <ol>
+   *   <li>Value of
+   *   {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
+   *   <li>Value of
+   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
+   *   <li>If enabled: Self-generated uuid.</li>
+   *   <li>If not disabled: Application ID</li>
+   * </ol>
+   * The UUID bonding takes place during construction;
+   * the staging committers use it to set up their wrapped
+   * committer to a path in the cluster FS which is unique to the
+   * job.
+   * <p>
+   *  In MapReduce jobs, the application ID is issued by YARN, and
+   *  unique across all jobs.
+   * </p>
+   * In {@link #setupJob(JobContext)} the job context's configuration
+   * will be patched
+   * be valid in all sequences where the job has been set up for the
+   * configuration passed in.
+   * <p>
+   *   If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
+   *   is set, then an external UUID MUST be passed in.
+   *   This can be used to verify that the spark engine is reliably setting
+   *   unique IDs for staging.
+   * </p>
+   * @param conf job/task configuration
+   * @param jobId job ID from YARN or spark.
+   * @return Job UUID and source of it.
+   * @throws PathCommitException no UUID was found and it was required
+   */
+  public static Pair<String, JobUUIDSource>
+      buildJobUUID(Configuration conf, JobID jobId)
+      throws PathCommitException {
+
+    String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
+
+    if (!jobUUID.isEmpty()) {
+      return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
+    }
+    // there is no job UUID.
+    // look for one from spark
+    jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
+    if (!jobUUID.isEmpty()) {
+      return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
+    }
+
+    // there is no UUID configuration in the job/task config
+
+    // Check the job hasn't declared a requirement for the UUID.
+    // This allows or fail-fast validation of Spark behavior.
+    if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, false)) {
+      throw new PathCommitException("", E_NO_SPARK_UUID);
+    }
+
+    // see if the job can generate a random UUID
+    if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID, false)) {

Review comment:
       Why would a committer not want to generate a unique ID and use the job 
ID instead?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -202,24 +233,24 @@ protected final void setOutputPath(Path outputPath) {
    * @return the working path.
    */
   @Override
-  public Path getWorkPath() {
+  public final Path getWorkPath() {
     return workPath;
   }
 
   /**
    * Set the work path for this committer.
    * @param workPath the work path to use.
    */
-  protected void setWorkPath(Path workPath) {
+  protected final void setWorkPath(Path workPath) {
     LOG.debug("Setting work path to {}", workPath);
     this.workPath = workPath;
   }
 
-  public Configuration getConf() {
+  public final Configuration getConf() {
     return conf;
   }
 
-  protected void setConf(Configuration conf) {
+  protected final void setConf(Configuration conf) {

Review comment:
       A lot of these changes don't seem related to the UUID change. I think it 
would be easier to review if only necessary changes were in this PR.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -1044,6 +1166,155 @@ protected void abortPendingUploads(
     }
   }
 
+  /**
+   * Scan for active uploads and list them along with a warning message.
+   * Errors are ignored.
+   * @param path output path of job.
+   */
+  protected void warnOnActiveUploads(final Path path) {
+    List<MultipartUpload> pending;
+    try {
+      pending = getCommitOperations()
+          .listPendingUploadsUnderPath(path);
+    } catch (IOException e) {
+      LOG.debug("Failed to list uploads under {}",
+          path, e);
+      return;
+    }
+    if (!pending.isEmpty()) {
+      // log a warning
+      LOG.warn("{} active upload(s) in progress under {}",
+          pending.size(),
+          path);
+      LOG.warn("Either jobs are running concurrently"
+          + " or failed jobs are not being cleaned up");
+      // and the paths + timestamps
+      DateFormat df = DateFormat.getDateTimeInstance();
+      pending.forEach(u ->
+          LOG.info("[{}] {}",
+              df.format(u.getInitiated()),
+              u.getKey()));
+      if (shouldAbortUploadsInCleanup()) {
+        LOG.warn("This committer will abort these uploads in job cleanup");
+      }
+    }
+  }
+
+  /**
+   * Build the job UUID.
+   *
+   * <p>
+   *  In MapReduce jobs, the application ID is issued by YARN, and
+   *  unique across all jobs.
+   * </p>
+   * <p>
+   * Spark will use a fake app ID based on the current time.
+   * This can lead to collisions on busy clusters.
+   *
+   * </p>
+   * <ol>
+   *   <li>Value of
+   *   {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
+   *   <li>Value of
+   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
+   *   <li>If enabled: Self-generated uuid.</li>
+   *   <li>If not disabled: Application ID</li>
+   * </ol>
+   * The UUID bonding takes place during construction;
+   * the staging committers use it to set up their wrapped
+   * committer to a path in the cluster FS which is unique to the
+   * job.
+   * <p>
+   *  In MapReduce jobs, the application ID is issued by YARN, and
+   *  unique across all jobs.
+   * </p>
+   * In {@link #setupJob(JobContext)} the job context's configuration
+   * will be patched
+   * be valid in all sequences where the job has been set up for the
+   * configuration passed in.
+   * <p>
+   *   If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
+   *   is set, then an external UUID MUST be passed in.
+   *   This can be used to verify that the spark engine is reliably setting
+   *   unique IDs for staging.
+   * </p>
+   * @param conf job/task configuration
+   * @param jobId job ID from YARN or spark.
+   * @return Job UUID and source of it.
+   * @throws PathCommitException no UUID was found and it was required
+   */
+  public static Pair<String, JobUUIDSource>
+      buildJobUUID(Configuration conf, JobID jobId)
+      throws PathCommitException {
+
+    String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
+
+    if (!jobUUID.isEmpty()) {
+      return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
+    }
+    // there is no job UUID.
+    // look for one from spark
+    jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
+    if (!jobUUID.isEmpty()) {
+      return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);

Review comment:
       This is incorrect if this is self-generated but this method is called 
after `setupJob`. I think that method shouldn't set `SPARK_WRITE_UUID`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 510916)
    Time Spent: 5h 50m  (was: 5h 40m)

> S3A committer to support concurrent jobs with same app attempt ID & dest dir
> ----------------------------------------------------------------------------
>
>                 Key: HADOOP-17318
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17318
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.3.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Reported failure of magic committer block uploads as pending upload ID is 
> unknown. Likely cause: it's been aborted by another job
> # Make it possible to turn off cleanup of pending uploads in magic committer
> # log more about uploads being deleted in committers
> # and upload ID in the S3aBlockOutputStream errors
> There are other concurrency issues when you look close, see SPARK-33230
> * magic committer uses app attempt ID as path under __magic; if there are 
> duplicate then they will conflict
> * staging committer local temp dir uses app attempt id
> Fix will be to have a job UUID which for spark will be picked up from the 
> SPARK-33230 changes, (option to self-generate in job setup for hadoop 3.3.1+ 
> older spark builds); fall back to app-attempt *unless that fallback has been 
> disabled*
> MR: configure to use app attempt ID
> Spark: configure to fail job setup if app attempt ID is the source of a job 
> uuid



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to