Shixiong Zhu created SPARK-19965:
------------------------------------

             Summary: DataFrame batch reader may fail to infer partitions when 
reading FileStreamSink's output
                 Key: SPARK-19965
                 URL: https://issues.apache.org/jira/browse/SPARK-19965
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.1.0
            Reporter: Shixiong Zhu


Reproducer
{code}
  test("partitioned writing and batch reading with 'basePath'") {
    val inputData = MemoryStream[Int]
    val ds = inputData.toDS()

    val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
    val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath

    var query: StreamingQuery = null

    try {
      query =
        ds.map(i => (i, i * 1000))
          .toDF("id", "value")
          .writeStream
          .partitionBy("id")
          .option("checkpointLocation", checkpointDir)
          .format("parquet")
          .start(outputDir)

      inputData.addData(1, 2, 3)
      failAfter(streamingTimeout) {
        query.processAllAvailable()
      }

      spark.read.option("basePath", outputDir).parquet(outputDir + "/*").show()
    } finally {
      if (query != null) {
        query.stop()
      }
    }
  }
{code}

Stack trace
{code}
[info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
(3 seconds, 928 milliseconds)
[info]   java.lang.AssertionError: assertion failed: Conflicting directory 
structures detected. Suspicious paths:
[info]  ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
[info]  ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
[info] 
[info] If provided paths are partition directories, please set "basePath" in 
the options of the data source to specify the root directory of the table. If 
there are multiple root directories, please load them separately and then union 
them.
[info]   at scala.Predef$.assert(Predef.scala:170)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
[info]   at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
[info]   at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
[info]   at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
[info]   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
[info]   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
[info]   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
[info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
[info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
[info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to