[ 
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541615#comment-17541615
 ] 

Steve Loughran commented on HADOOP-17935:
-----------------------------------------

revisiting. 
* that staging path  "spark.hadoop.fs.s3a.committer.staging.tmp.path": MUST NOT 
BE ON S3. it relies on the  FileOutputCommitter v1 commit algorithm, which 
isn't safe there
* that stack trace is from an older release which doesn't schedule the work in 
a separate (unbounded) thread pool. 

i have vague memories of deadlock here if work in the bounded thread pool was 
using the same pool. which could happen if an s3a output stream was being 
written in the bounded threadpool itself

[~brandonvin] are you seeing this after upgrading to a hadoop 3.3.x release 
(ideally 3.3.3?)

if so, enable debug logging on org.apache.hadoop.fs.s3a.Invoker  and see if you 
see messages about retries. that will let us know whether it is retry problems 
or some deadlock.
currently we are only using that threadpool for rename copy operations. I don't 
think it is happening here.

I'm going to close this as a

> Spark job stuck in S3A StagingCommitter::setupJob
> -------------------------------------------------
>
>                 Key: HADOOP-17935
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17935
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/s3
>    Affects Versions: 3.2.1
>         Environment: Spark 2.4.4
> Hadoop 3.2.1
> "spark.hadoop.fs.s3a.committer.name": "directory"
>            Reporter: Brandon
>            Priority: Major
>
> This is using the S3A directory staging committer, the Spark driver gets 
> stuck in a retry loop inside setupJob. Here's a stack trace:
> {noformat}
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
> org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown 
> Source)
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
>  => holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown 
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>  => holding 
> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
> org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter.setupJob(DirectoryStagingCommitter.java:65)
> org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.setupJob(StagingCommitter.java:458)
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:355)
> org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
> org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2062)
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2129)
> org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2808)
> org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2833)
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
> org.apache.hadoop.fs.s3a.Invoker$$Lambda$232/695085082.execute(Unknown Source)
> org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1932/855044548.execute(Unknown 
> Source)
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2835)
> org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1589)
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
> org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
> org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1933/1245120662.execute(Unknown
>  Source)
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$8(S3AFileSystem.java:1461)
> com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:2136)
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1191)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.pauseBeforeRetry(AmazonHttpClient.java:1653)
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doPauseBeforeRetry(AmazonHttpClient.java:1679)
> {noformat}
> Another thing of note in this setup, is the staging committer is using an S3 
> bucket to track pending commits (used to use HDFS for this, but switched to 
> S3 once it became strongly consistent).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to