[
https://issues.apache.org/jira/browse/SPARK-35406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350371#comment-17350371
]
Steve Loughran commented on SPARK-35406:
----------------------------------------
This is actually triggered by garbage collection; not an AWS SDK thing.
The S3A Input stream keeps a reference to the stream returned by the S3Object,
but somehow the outer objects can get a GC'd and everything breaks. The fix is:
keep a reference to that outer object.
Updating the AWS SDK is unlikely to have any direct effect; it an the required
matching hadoop-* JARs may change GC profile enough to hide it. You should be
off Hadoop 2.7 for many reasons, especially when working with object stores.
And for java 11. Not supported there, so all problems will be "wontfix."
As well for efficient random access IO over HTTP connections, Hadoop 3.1+ ships
with the S3A committers which are actually designed to work with S3. The one
you are using relies on rename() being fast and, for a directory, atomic. This
doesn't hold with s3. At least now S3 is consistent rename() is going to
discover the correct set of files to copy.
> TaskCompletionListenerException: Premature end of Content-Length delimited
> message body
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-35406
> URL: https://issues.apache.org/jira/browse/SPARK-35406
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.7
> Environment: Spark 2.4.7
> Build with Hadoop 2.7.3
> hadoop-aws jar 2.7.3
> aws-java-sdk 1.7.4
> EKS 1.18
> Reporter: Avik Aggarwal
> Priority: Major
> Labels: Kubernetes
>
> Running Spark on kubernetes (EKS 1.18) and Below version fo different
> components:
> Spark 2.4.7
> Build with Hadoop 2.7.3
> hadoop-aws jar 2.7.3
> aws-java-sdk 1.7.4
>
> I am using s3a endpoint for reading S3 objects from private repository and
> appropriate role has been given to executors.
>
> I am facing below error while read/writing bigger files from/to S3.
> Size for which I am facing issue - early MBs (10-30).
> While it works for files with KBs of size.
>
> Logs : -
> {code:java}
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0
> (TID 6, 10.83.7.112, executor 2):
> org.apache.spark.util.TaskCompletionListenerException: Premature end of
> Content-Length delimited message body (expected: 3918825; received: 18020
> at
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
> at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
> at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
> at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3369)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
> at
> org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:236)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:68)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:63)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.spark.util.TaskCompletionListenerException: Premature
> end of Content-Length delimited message body (expected: 3918825; received:
> 18020
> at
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]