[
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421068#comment-17421068
]
Brandon edited comment on HADOOP-17935 at 9/27/21, 10:14 PM:
-------------------------------------------------------------
> turns out each object in a bulk delete request counts as 1 write of your
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more
> writes going on in same partition of a bucket. The AWS SDK throttle retry
> handler blindly retries, and so does the S3A one.
>
> Are you writing files down a deep tree? And is it likely there's other
> writes going on to the same tree in the bucket?
Interesting. We do have lots of Spark jobs that run concurrently throughout the
day and are all writing through S3A. Based on that, one theory is that other
jobs are in the process of cleaning up their pending commits, by deleting a
large "directory" and unleashing a thundering herd. This causes the large
volume of delete requests and 503s. If at the same time as that thundering
herd, another job tries to begin a write, it could get stuck as described in
this ticket. All that seems to explain the If this theory is true, then an
upgrade to Hadoop 3.3.1 should help.
About the depth of the tree: I'm using these configs to specify the storage of
pending commits:
{noformat}
"spark.hadoop.fs.defaultFS": "s3a://some-bucket"
"spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
{noformat}
So, the location of the pending commits is at the root of that bucket, as
shallow a tree as possible. The S3A directory committer always prepends a user
name, which happens to be "www-data" in the Spark environment. So the working
directory of the pending commits for a given write is:
{noformat}
www-data/{Random UUID}/staging-uploads/...
{noformat}
Having that shared "www-data" prefix probably isn't helping with load balancing
the bucket.
was (Author: brandonvin):
> turns out each object in a bulk delete request counts as 1 write of your
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more
> writes going on in same partition of a bucket. The AWS SDK throttle retry
> handler blindly retries, and so does the S3A one.
>
> Are you writing files down a deep tree? And is it likely there's other writes
> going on to the same tree in the bucket?
Interesting. We do have lots of Spark jobs that run concurrently throughout the
day and are all writing through S3A. Based on that, one theory is that other
jobs are in the process of cleaning up their pending commits, by deleting a
large "directory" and unleashing a thundering herd. This causes the large
volume of delete requests and 503s. If at the same time as that thundering
herd, another job tries to begin a write, it could get stuck as described in
this ticket. All that seems to explain the If this theory is true, then an
upgrade to Hadoop 3.3.1 should help.
About the depth of the tree: I'm using these configs to specify the storage of
pending commits:
```
"spark.hadoop.fs.defaultFS": "s3a://some-bucket"
"spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
```
So, the location of the pending commits is at the root of that bucket, as
shallow a tree as possible. The S3A directory committer always prepends a user
name, which happens to be "www-data" in the Spark environment. So the working
directory of the pending commits for a given write is:
```
www-data/{Random UUID}/staging-uploads/...
```
Having that shared "www-data" prefix probably isn't helping with load balancing
the bucket.
> Spark job stuck in S3A StagingCommitter::setupJob
> -------------------------------------------------
>
> Key: HADOOP-17935
> URL: https://issues.apache.org/jira/browse/HADOOP-17935
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/s3
> Affects Versions: 3.2.1
> Environment: Spark 2.4.4
> Hadoop 3.2.1
> "spark.hadoop.fs.s3a.committer.name": "directory"
> Reporter: Brandon
> Priority: Major
>
> This is using the S3A directory staging committer, the Spark driver gets
> stuck in a retry loop inside setupJob. Here's a stack trace:
> {noformat}
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
> org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown
> Source)
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
> => holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> => holding
> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
> org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter.setupJob(DirectoryStagingCommitter.java:65)
> org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.setupJob(StagingCommitter.java:458)
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:355)
> org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
> org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2062)
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2129)
> org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2808)
> org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2833)
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
> org.apache.hadoop.fs.s3a.Invoker$$Lambda$232/695085082.execute(Unknown Source)
> org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1932/855044548.execute(Unknown
> Source)
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2835)
> org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1589)
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
> org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
> org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1933/1245120662.execute(Unknown
> Source)
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$8(S3AFileSystem.java:1461)
> com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:2136)
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1191)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.pauseBeforeRetry(AmazonHttpClient.java:1653)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doPauseBeforeRetry(AmazonHttpClient.java:1679)
> {noformat}
> Another thing of note in this setup, is the staging committer is using an S3
> bucket to track pending commits (used to use HDFS for this, but switched to
> S3 once it became strongly consistent).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]