[
https://issues.apache.org/jira/browse/SPARK-25966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681297#comment-16681297
]
Steve Loughran commented on SPARK-25966:
----------------------------------------
bq. Hadoop 3.1.x is not yet officially supported in Spark.
true, but there were some changes in that S3A input stream which it is good to
see if it caused this
h3. better recovery of failures in the underlying read() call
Before:
[https://github.com/apache/hadoop/blob/branch-2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java#L382]
After:
[https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java#L364]
h3. AWS SDK++
an update to a more recent AWS SDK. (1.11.271), which complains a lot more if
you close in input stream while there's data
h3. Adaptive seek policy
When you start off with fadvise=normal the first read is the full file, but if
you do a backward seek is switches to random IO
(fs.s3a.experimental.fadvise=random):
[https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java#L281]
Unless the fadvise=random is set (Best) or fadvise=sequential (completely wrong
for striped columnar formats), the parquet reader is following that codepath.
[~andrioni]: can you put the log {{org.apache.hadoop.fs.s3a.S3AInputStream}}
into DEBUG and see what it says on these failures?
> "EOF Reached the end of stream with bytes left to read" while reading/writing
> to Parquets
> -----------------------------------------------------------------------------------------
>
> Key: SPARK-25966
> URL: https://issues.apache.org/jira/browse/SPARK-25966
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0
> Environment: Spark 2.4.0 (built from RC5 tag) running Hadoop 3.1.1 on
> top of a Mesos cluster. Both input and output Parquet files are on S3.
> Reporter: Alessandro Andrioni
> Priority: Major
>
> I was persistently getting the following exception while trying to run one
> Spark job we have using Spark 2.4.0. It went away after I regenerated from
> scratch all the input Parquet files (generated by another Spark job also
> using Spark 2.4.0).
> Is there a chance that Spark is writing (quite rarely) corrupted Parquet
> files?
> {code:java}
> org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
> at
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557)
> (...)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 312 in stage 682.0 failed 4 times, most recent failure: Lost task 312.3
> in stage 682.0 (TID 235229, 10.130.29.78, executor 77): java.io.EOFException:
> Reached the end of stream with 996 bytes left to read
> at
> org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
> at
> org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
> at
> org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
> at
> org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage109.scan_nextBatch_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage109.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> This job used to work fine with Spark 2.2.1, and succeeded once we
> regenerated the inputs. This is also one of three jobs that had this issue
> out of the 6000+ we tested.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]