[
https://issues.apache.org/jira/browse/HADOOP-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370953#comment-17370953
]
Daniel Zhi edited comment on HADOOP-17201 at 6/28/21, 11:57 PM:
----------------------------------------------------------------
Our tasks using StagingCommitter could be "stuck" in
S3AFileSystem.deleteUnnecessaryFakeDirectories() for 300~500 seconds because
the code retries with exponential delay (not a problem by itself) and the final
exception being swallowed. The logic collects O(depth) parent dirs of an object
path and call s3.deleteObjects() on them. For example, putObjectDirect of
s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/0/_temporary/file
would lead to batch delete of following 7 objects:
# {{s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/0/_temporary/}}
# {{s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/0/}}
# {{s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/}}
# {{s3://bucket/05aee454/root/uuid/staging-uploads/}}
# {{s3://bucket/05aee454/root/uuid/}}
# {{s3://bucket/05aee454/root/}}
# {{s3://bucket/05aee454/}}
Such batch delete happens in every putObjectDirect() and rename(), and the keys
spread across multiple levels. Either simply due to too many of such calls, and
likely the fact that keys are across levels hence contention across parallel
such requests, it triggers a large number of SlowDown errors from S3. The retry
with exponential delay (and finally swallowing the exception) makes the task
just appear to hang without progress for sometime O(500) seconds, and no
obvious failure in the end.
Most of the fake parent paths end with "/" actually do not exist in S3 (maybe
except the immediate one created by createFakeDirectory). It is not clear to me
what's the exact purpose of deleteUnnecessaryFakeDirectories() but the impact
is severe when use s3 as the defaultFS (for the simplicity to avoid managing
HDFS and S3 being strong consistent [https://aws.amazon.com/s3/consistency/]).
was (Author: danzhi):
Our tasks using StagingCommitter could be "stuck" in
S3AFileSystem.deleteUnnecessaryFakeDirectories() for 300~500 seconds because
the code retries with exponential delay (not a problem by itself) and the final
exception being swallowed. The logic collects O(depth) parent dirs of an object
path and call s3.deleteObjects() on them. For example, putObjectDirect of
s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/0/_temporary/file
would lead to batch delete of following 7 objects:
# {{s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/0/_temporary/}}
# {{s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/0/}}
# {{s3://bucket/05aee454/root/uuid/staging-uploads/_temporary/}}
# {{s3://bucket/05aee454/root/uuid/staging-uploads/}}
# {{s3://bucket/05aee454/root/uuid/}}
# {{s3://bucket/05aee454/root/}}
# {{s3://bucket/05aee454/}}
Such batch delete happens in every putObjectDirect() and rename(), and the keys
spread across multiple levels. Either simply due to too many of such calls, and
likely the fact that keys are across levels hence contention across parallel
such requests, it trigger large number of SlowDown error from S3. The retry
with exponential delay (and finally swallow the exception) make the task just
appear as hang without progress for sometime O(500) seconds, and no obvious
failure in the end.
Most of the fake parent paths end with "/" actually do not exist in S3 (maybe
except the immediate one created by createFakeDirectory). It is not clear to me
what's the exact purpose of deleteUnnecessaryFakeDirectories() but the impact
is severe when use s3 as the defaultFS (for the simplicity to avoid managing
HDFS and S3 being strong consistent [https://aws.amazon.com/s3/consistency/]).
> Spark job with s3acommitter stuck at the last stage
> ---------------------------------------------------
>
> Key: HADOOP-17201
> URL: https://issues.apache.org/jira/browse/HADOOP-17201
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/s3
> Affects Versions: 3.2.1
> Environment: we are on spark 2.4.5/hadoop 3.2.1 with s3a committer.
> spark.hadoop.fs.s3a.committer.magic.enabled: 'true'
> spark.hadoop.fs.s3a.committer.name: magic
> Reporter: Dyno
> Priority: Major
> Labels: pull-request-available
> Attachments: exec-120.log, exec-125.log, exec-25.log, exec-31.log,
> exec-36.log, exec-44.log, exec-5.log, exec-64.log, exec-7.log
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> usually our spark job took 1 hour or 2 to finish, occasionally it runs for
> more than 3 hour and then we know it's stuck and usually the executor has
> stack like this
> {{
> "Executor task launch worker for task 78620" #265 daemon prio=5 os_prio=0
> tid=0x00007f73e0005000 nid=0x12d waiting on condition [0x00007f74cb291000]
> java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:349)
> at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:238)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper$$Lambda$210/1059071691.execute(Unknown
> Source)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
> at
> org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/586859139.execute(Unknown Source)
> at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:226)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:271)
> at
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.complete(S3ABlockOutputStream.java:660)
> at
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$200(S3ABlockOutputStream.java:521)
> at
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:385)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
> at
> org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
> at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
> at
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
> at
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
> at
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
> - <0x00000003a57332e0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
> }}
> captured jstack on the stuck executors in case it's useful.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]