Shixiong Zhu created SPARK-19965:
------------------------------------
Summary: DataFrame batch reader may fail to infer partitions when
reading FileStreamSink's output
Key: SPARK-19965
URL: https://issues.apache.org/jira/browse/SPARK-19965
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Shixiong Zhu
Reproducer
{code}
test("partitioned writing and batch reading with 'basePath'") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val outputDir = Utils.createTempDir(namePrefix =
"stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix =
"stream.checkpoint").getCanonicalPath
var query: StreamingQuery = null
try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)
inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}
spark.read.option("basePath", outputDir).parquet(outputDir + "/*").show()
} finally {
if (query != null) {
query.stop()
}
}
}
{code}
Stack trace
{code}
[info] - partitioned writing and batch reading with 'basePath' *** FAILED ***
(3 seconds, 928 milliseconds)
[info] java.lang.AssertionError: assertion failed: Conflicting directory
structures detected. Suspicious paths:
[info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
[info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
[info]
[info] If provided paths are partition directories, please set "basePath" in
the options of the data source to specify the root directory of the table. If
there are multiple root directories, please load them separately and then union
them.
[info] at scala.Predef$.assert(Predef.scala:170)
[info] at
org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
[info] at
org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
[info] at
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
[info] at
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
[info] at
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
[info] at
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
[info] at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
[info] at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
[info] at
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
[info] at
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
[info] at
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
[info] at
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
[info] at
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]