[ 
https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651778#comment-15651778
 ] 

Michael Armbrust commented on SPARK-18227:
------------------------------------------

The {{_spark_metadata}} directory holds the transaction log, which is used to 
ensure that the FileSink produces exactly once results, even in the case of 
failures.  The issue here is that you are using globs (i.e. the {{*}}), which 
is causing Spark SQL to read the individual files, rather than loading the 
metadata.  Remove the {{*}} and the problem should go away.

> Parquet file stream sink create a hidden directory "_spark_metadata" cause 
> the DataFrame read from directory failed
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18227
>                 URL: https://issues.apache.org/jira/browse/SPARK-18227
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.0.1
>            Reporter: Lantao Jin
>
> When we set an out directory as a streaming sink with parquet format in 
> structured streaming,  as the streaming job running, all output parquet files 
> will be written to this out directory. However, it also creates a hidden 
> directory called "_spark_metadata" in the out directory. If we load the 
> parquet files from the out directory by "load", it will throw 
> RuntimeException and task failed.
> {code:java}
> val stream = modifiedData.writeStream.format("parquet")
> .option("checkpointLocation", "/path/ck/")
> .start("/path/out/")
> val df1 = spark.read.format("parquet").load("/path/out/*")
> {code}
> {panel}
> 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 
> 3131, cupid044.stratus.phx.ebay.com): java.lang.Ru
> ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too 
> s
> mall)   
>         at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
>         at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
> ordReaderBase.java:107)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
> ordReader.java:109)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:367)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:341)
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Sour
> ce)     
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> {panel}
> That's because the ParquetFileReader reads the metadata file as a parquet 
> format. 
> I thought the smooth way to fix it is moving the metadata directory to 
> another path, but from the code DataSource.scala, it has less path 
> information except out directory path to store into. So maybe skipping hidden 
> files and paths could be a better way. But from the stack trace above, it 
> failed in initialize() in SpecificParquetRecordReaderBase. It means  that 
> metadata files in hidden directory have been traversed in upper 
> invocation(FileScanRDD). But in there, no format info can be known to skip a 
> hidden directory(or over authority).
> So, what is the best way to fix it?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to