[
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Armbrust updated SPARK-19965:
-------------------------------------
Target Version/s: 2.2.0
> DataFrame batch reader may fail to infer partitions when reading
> FileStreamSink's output
> ----------------------------------------------------------------------------------------
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Shixiong Zhu
>
> Reproducer
> {code}
> test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix =
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix =
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
> query =
> ds.map(i => (i, i * 1000))
> .toDF("id", "value")
> .writeStream
> .partitionBy("id")
> .option("checkpointLocation", checkpointDir)
> .format("parquet")
> .start(outputDir)
> inputData.addData(1, 2, 3)
> failAfter(streamingTimeout) {
> query.processAllAvailable()
> }
> spark.read.option("basePath", outputDir).parquet(outputDir +
> "/*").show()
> } finally {
> if (query != null) {
> query.stop()
> }
> }
> }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED ***
> (3 seconds, 928 milliseconds)
> [info] java.lang.AssertionError: assertion failed: Conflicting directory
> structures detected. Suspicious paths:
> [info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info]
> [info] If provided paths are partition directories, please set "basePath" in
> the options of the data source to specify the root directory of the table. If
> there are multiple root directories, please load them separately and then
> union them.
> [info] at scala.Predef$.assert(Predef.scala:170)
> [info] at
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info] at
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info] at
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info] at
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info] at
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info] at
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info] at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info] at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info] at
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info] at
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info] at
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info] at
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info] at
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]