steveloughran commented on a change in pull request #2399:
URL: https://github.com/apache/hadoop/pull/2399#discussion_r523130440
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -411,26 +464,63 @@ protected void maybeCreateSuccessMarker(JobContext
context,
* be deleted; creating it now ensures there is something at the end
* while the job is in progress -and if nothing is created, that
* it is still there.
+ * <p>
+ * The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}
+ * is set to the job UUID; if generated locally
+ * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched.
+ * The field {@link #jobSetup} is set to true to note that
+ * this specific committer instance was used to set up a job.
+ * </p>
* @param context context
* @throws IOException IO failure
*/
@Override
public void setupJob(JobContext context) throws IOException {
- try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
+ try (DurationInfo d = new DurationInfo(LOG,
+ "Job %s setting up", getUUID())) {
+ // record that the job has been set up
+ jobSetup = true;
+ // patch job conf with the job UUID.
+ Configuration c = context.getConfiguration();
+ c.set(FS_S3A_COMMITTER_UUID, this.getUUID());
+ if (getUUIDSource() == JobUUIDSource.GeneratedLocally) {
+ // we set the UUID up locally. Save it back to the job configuration
+ c.set(SPARK_WRITE_UUID, this.getUUID());
Review comment:
I was just trying to be rigorous. will roll back. While I'm there I
think I'll add the source attribute -i can then probe for it in the tests. I'm
already saving it in the _SUCCESS file
##########
File path:
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
##########
@@ -248,6 +247,47 @@ As an example, the endpoint for S3 Frankfurt is
`s3.eu-central-1.amazonaws.com`:
</property>
```
+### `Class does not implement AWSCredentialsProvider`
Review comment:
going to add that specific bit about spark hive classloaders here too,
which is where this is coming from
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -1044,6 +1166,155 @@ protected void abortPendingUploads(
}
}
+ /**
+ * Scan for active uploads and list them along with a warning message.
+ * Errors are ignored.
+ * @param path output path of job.
+ */
+ protected void warnOnActiveUploads(final Path path) {
+ List<MultipartUpload> pending;
+ try {
+ pending = getCommitOperations()
+ .listPendingUploadsUnderPath(path);
+ } catch (IOException e) {
+ LOG.debug("Failed to list uploads under {}",
+ path, e);
+ return;
+ }
+ if (!pending.isEmpty()) {
+ // log a warning
+ LOG.warn("{} active upload(s) in progress under {}",
+ pending.size(),
+ path);
+ LOG.warn("Either jobs are running concurrently"
+ + " or failed jobs are not being cleaned up");
+ // and the paths + timestamps
+ DateFormat df = DateFormat.getDateTimeInstance();
+ pending.forEach(u ->
+ LOG.info("[{}] {}",
+ df.format(u.getInitiated()),
+ u.getKey()));
+ if (shouldAbortUploadsInCleanup()) {
+ LOG.warn("This committer will abort these uploads in job cleanup");
+ }
+ }
+ }
+
+ /**
+ * Build the job UUID.
+ *
+ * <p>
+ * In MapReduce jobs, the application ID is issued by YARN, and
+ * unique across all jobs.
+ * </p>
+ * <p>
+ * Spark will use a fake app ID based on the current time.
+ * This can lead to collisions on busy clusters.
+ *
+ * </p>
+ * <ol>
+ * <li>Value of
+ * {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
+ * <li>Value of
+ * {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
+ * <li>If enabled: Self-generated uuid.</li>
+ * <li>If not disabled: Application ID</li>
+ * </ol>
+ * The UUID bonding takes place during construction;
+ * the staging committers use it to set up their wrapped
+ * committer to a path in the cluster FS which is unique to the
+ * job.
+ * <p>
+ * In MapReduce jobs, the application ID is issued by YARN, and
+ * unique across all jobs.
+ * </p>
+ * In {@link #setupJob(JobContext)} the job context's configuration
+ * will be patched
+ * be valid in all sequences where the job has been set up for the
+ * configuration passed in.
+ * <p>
+ * If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
+ * is set, then an external UUID MUST be passed in.
+ * This can be used to verify that the spark engine is reliably setting
+ * unique IDs for staging.
+ * </p>
+ * @param conf job/task configuration
+ * @param jobId job ID from YARN or spark.
+ * @return Job UUID and source of it.
+ * @throws PathCommitException no UUID was found and it was required
+ */
+ public static Pair<String, JobUUIDSource>
+ buildJobUUID(Configuration conf, JobID jobId)
+ throws PathCommitException {
+
+ String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
+
+ if (!jobUUID.isEmpty()) {
+ return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
+ }
+ // there is no job UUID.
+ // look for one from spark
+ jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
+ if (!jobUUID.isEmpty()) {
+ return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
+ }
+
+ // there is no UUID configuration in the job/task config
+
+ // Check the job hasn't declared a requirement for the UUID.
+ // This allows or fail-fast validation of Spark behavior.
+ if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, false)) {
+ throw new PathCommitException("", E_NO_SPARK_UUID);
+ }
+
+ // see if the job can generate a random UUID
+ if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID, false)) {
Review comment:
MR jobs where their updated config doesn't get through to the tasks. Use
a self-generated ID and things won't work. And as they know that the app ID is
unique on that yarn cluster, that's all they need.
For my spark integration tests I turned off auto generate and enabled the
fail-on-job-ID option, to verify that all operations (RDD, dataframe, dataset,
sql) were passing down the spark.sql option. Helped me find out where it wasn't
being set
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
##########
@@ -1044,6 +1166,155 @@ protected void abortPendingUploads(
}
}
+ /**
+ * Scan for active uploads and list them along with a warning message.
+ * Errors are ignored.
+ * @param path output path of job.
+ */
+ protected void warnOnActiveUploads(final Path path) {
+ List<MultipartUpload> pending;
+ try {
+ pending = getCommitOperations()
+ .listPendingUploadsUnderPath(path);
+ } catch (IOException e) {
+ LOG.debug("Failed to list uploads under {}",
+ path, e);
+ return;
+ }
+ if (!pending.isEmpty()) {
+ // log a warning
+ LOG.warn("{} active upload(s) in progress under {}",
+ pending.size(),
+ path);
+ LOG.warn("Either jobs are running concurrently"
+ + " or failed jobs are not being cleaned up");
+ // and the paths + timestamps
+ DateFormat df = DateFormat.getDateTimeInstance();
+ pending.forEach(u ->
+ LOG.info("[{}] {}",
+ df.format(u.getInitiated()),
+ u.getKey()));
+ if (shouldAbortUploadsInCleanup()) {
+ LOG.warn("This committer will abort these uploads in job cleanup");
+ }
+ }
+ }
+
+ /**
+ * Build the job UUID.
+ *
+ * <p>
+ * In MapReduce jobs, the application ID is issued by YARN, and
+ * unique across all jobs.
+ * </p>
+ * <p>
+ * Spark will use a fake app ID based on the current time.
+ * This can lead to collisions on busy clusters.
+ *
+ * </p>
+ * <ol>
+ * <li>Value of
+ * {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
+ * <li>Value of
+ * {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
+ * <li>If enabled: Self-generated uuid.</li>
+ * <li>If not disabled: Application ID</li>
+ * </ol>
+ * The UUID bonding takes place during construction;
+ * the staging committers use it to set up their wrapped
+ * committer to a path in the cluster FS which is unique to the
+ * job.
+ * <p>
+ * In MapReduce jobs, the application ID is issued by YARN, and
+ * unique across all jobs.
+ * </p>
+ * In {@link #setupJob(JobContext)} the job context's configuration
+ * will be patched
+ * be valid in all sequences where the job has been set up for the
+ * configuration passed in.
+ * <p>
+ * If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
+ * is set, then an external UUID MUST be passed in.
+ * This can be used to verify that the spark engine is reliably setting
+ * unique IDs for staging.
+ * </p>
+ * @param conf job/task configuration
+ * @param jobId job ID from YARN or spark.
+ * @return Job UUID and source of it.
+ * @throws PathCommitException no UUID was found and it was required
+ */
+ public static Pair<String, JobUUIDSource>
+ buildJobUUID(Configuration conf, JobID jobId)
+ throws PathCommitException {
+
+ String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
+
+ if (!jobUUID.isEmpty()) {
+ return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
+ }
+ // there is no job UUID.
+ // look for one from spark
+ jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
+ if (!jobUUID.isEmpty()) {
+ return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
Review comment:
Removed it there.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]