Lantao Jin created SPARK-18227:
----------------------------------
Summary: Parquet file stream sink create a hidden directory
"_spark_metadata" cause the DataFrame read failed
Key: SPARK-18227
URL: https://issues.apache.org/jira/browse/SPARK-18227
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.0.1
Reporter: Lantao Jin
When we set an out directory as a streaming sink with parquet format in
structured streaming, as the streaming job running, all output parquet files
will be written to this out directory. However, it also creates a hidden
directory called "_spark_metadata" in the out directory. If we load the parquet
files from the out directory by "load", it will throw RuntimeException and task
failed.
{code:java}
val stream = modifiedData.writeStream.format("parquet")
.option("checkpointLocation", "/path/ck/")
.start("/path/out/")
val df1 = spark.read.format("parquet").load("path/out/*")
{code}
{panel}
16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 3131,
cupid044.stratus.phx.ebay.com): java.lang.Ru
ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too s
mall)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
ordReaderBase.java:107)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
ordReader.java:109)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
mat.scala:367)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
mat.scala:341)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
Sour
ce)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
{panel}
That's because the ParquetFileReader reads the metadata file as a parquet
format.
I thought the smooth way to fix it is moving the metadata directory to another
path, but from the code DataSource.scala, it has less path information except
out directory path to store into. So maybe skipping hidden files and paths
could be a better way. But from the stack trace above, it failed in
initialize() in SpecificParquetRecordReaderBase. It means that metadata files
in hidden directory have been traversed in upper invocation(FileScanRDD). But
in there, no format info can be known to skip a hidden directory(or over
authority).
So, what is the best way to fix it?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]