[ 
https://issues.apache.org/jira/browse/SPARK-11844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344105#comment-17344105
 ] 

Sungwon edited comment on SPARK-11844 at 5/13/21, 7:58 PM:
-----------------------------------------------------------

I'm in agreement with your intuition that S3aOutputStream may be closing 
without raising an underlying exception.

 

I've primarily seen this issue only when writing large parquet files (tends to 
be bigger than 1 or 2Gb), and have confirmed that these files are corrupt upon 
read through spark / parquet reader.

And interestingly enough, Hadoop 2.9 has a similar issue with their 
S3aInputStream, just that it actually throws an exception and closes the 
stream. The root cause in the inputstream is that the S3Client object gets 
GC'ed prematurely even when the file read is still happening. I'm curious if 
something similar is happening with S3aOutputStream, but instead of detecting 
the exception of closing the stream, it is actually continuing the write and 
closing it without an exception.

[https://stackoverflow.com/questions/22673698/local-variable-in-method-being-garbage-collected-before-method-exits-java

]
 
[https://github.com/apache/hadoop/blob/rel/release-2.9.0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java]


was (Author: sungwy92):
I'm in agreement with your intuition that S3aOutputStream may be closing 
without raising an underlying exception.

 

I've primarily seen this issue only when writing large parquet files (tends to 
be bigger than 1 or 2Gb), and have confirmed that these files are corrupt upon 
read through spark / parquet reader.

And interestingly enough, Hadoop 2.9 has a similar issue with their 
S3aInputStream, just that it actually throws an exception and closes the 
stream. The root cause in the inputstream is that the S3Client object gets 
GC'ed prematurely even when the file read is still happening. I'm curious if 
something similar is happening with S3aOutputStream, but instead of detecting 
the exception of closing the stream, it is actually continuing the write and 
closing it without an exception.

[https://stackoverflow.com/questions/22673698/local-variable-in-method-being-garbage-collected-before-method-exits-java]
 
[https://github.com/apache/hadoop/blob/rel/release-2.9.0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java]

> can not read class org.apache.parquet.format.PageHeader: don't know what 
> type: 13
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-11844
>                 URL: https://issues.apache.org/jira/browse/SPARK-11844
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Yin Huai
>            Priority: Minor
>              Labels: bulk-closed
>
> I got the following error once when I was running a query
> {code}
> java.io.IOException: can not read class org.apache.parquet.format.PageHeader: 
> don't know what type: 13
>       at org.apache.parquet.format.Util.read(Util.java:216)
>       at org.apache.parquet.format.Util.readPageHeader(Util.java:65)
>       at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:534)
>       at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:546)
>       at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:496)
>       at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
>       at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:77)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:77)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:77)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:77)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: parquet.org.apache.thrift.protocol.TProtocolException: don't know 
> what type: 13
>       at 
> parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:806)
>       at 
> parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:500)
>       at 
> org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:158)
>       at 
> parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
> {code}
> The next retry was good. Right now, seems not critical. But, let's still 
> track it in case we see it in future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to