[ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928657#comment-15928657
 ] 

Shixiong Zhu commented on SPARK-19965:
--------------------------------------

[~lwlin] I think we can just ignore “_spark_metadata” in  InMemoryFileIndex. 
Could you try it?

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-19965
>                 URL: https://issues.apache.org/jira/browse/SPARK-19965
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
>     val inputData = MemoryStream[Int]
>     val ds = inputData.toDS()
>     val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
>     val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
>     var query: StreamingQuery = null
>     try {
>       query =
>         ds.map(i => (i, i * 1000))
>           .toDF("id", "value")
>           .writeStream
>           .partitionBy("id")
>           .option("checkpointLocation", checkpointDir)
>           .format("parquet")
>           .start(outputDir)
>       inputData.addData(1, 2, 3)
>       failAfter(streamingTimeout) {
>         query.processAllAvailable()
>       }
>       spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
>     } finally {
>       if (query != null) {
>         query.stop()
>       }
>     }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]        ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]        
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to