Hi,
It seems that something changed between Spark 1.6.2 and 2.0.0 that I wasn't
expecting.
If I have a DataFrame with records sorted within each partition, and I write it
to parquet and read back from the parquet, previously the records would be
iterated through in the same order they were written (assuming no shuffle has
taken place). But this doesn't seem to be the case anymore. Below is the code
to reproduce in a spark-shell.
Was this change expected?
Thanks,
Jason.
import org.apache.spark.sql._
def isSorted[T](self: DataFrame, mapping: Row => T)(implicit ordering:
Ordering[T]) = {
import self.sqlContext.implicits._
import ordering._
self
.mapPartitions(rows => {
val isSorted = rows
.map(mapping)
.sliding(2) // all adjacent pairs
.forall {
case x :: y :: Nil => x <= y
case x :: Nil => true
case Nil => true
}
Iterator(isSorted)
})
.reduce(_ && _)
}
// in Spark 2.0.0
spark.range(100000).toDF("id").registerTempTable("input")
spark.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY
id").write.mode("overwrite").parquet("input.parquet")
isSorted(spark.read.parquet("input.parquet"), _.getAs[Long]("id"))
// FALSE
// in Spark 1.6.2
sqlContext.range(100000).toDF("id").registerTempTable("input")
sqlContext.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY
id").write.mode("overwrite").parquet("input.parquet")
isSorted(sqlContext.read.parquet("input.parquet"), _.getAs[Long]("id"))
// TRUE