[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Navige updated SPARK-19629: --------------------------- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? =========================== - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? =========================== The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example =========================== {code:scala} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType)))) val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:scala} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType)))) val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? =========================== - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? =========================== The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example =========================== ``` # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType)))) val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 ``` ``` # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType)))) val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 ``` > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > ------------------------------------------------------------------------------- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core > Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. > Reporter: Navige > Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > What does the example do? > =========================== > - The code creates an exemplary dataframe with random data. > - The dataframe is repartitioned and stored to disk. > - Then the dataframe is re-read from disk. > - The number of partitions of the dataframe is considered. > What is the expected behaviour? > =========================== > The number of partitions specified when storing the dataframe should be the > same as when re-loading the dataframe from disk. > On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 > partitions as specified using npartitions; on Spark 2.1 the number of > partitions will equal the number of local nodes specified when starting Spark > (using local[X] as master). > Minimal code example > =========================== > {code:scala} > # run on Spark 1.6 > import scala.util.Random > import org.apache.spark.sql.types.{StructField, StructType, FloatType} > import org.apache.spark.sql.Row > val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) > val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", > FloatType)))) > val npartitions = 10 > df.repartition(npartitions).write.parquet("/tmp/test1") > val read = sqlContext.read.parquet("/tmp/test1") > assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 > {code} > {code:scala} > # run on Spark 2.1 > import scala.util.Random > import org.apache.spark.sql.types.{StructField, StructType, FloatType} > import org.apache.spark.sql.Row > val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) > val df = spark.sqlContext.createDataFrame(rdd, > StructType(Seq(StructField("test", FloatType)))) > val npartitions = 10 > df.repartition(npartitions).write.parquet("/tmp/test1") > val read = spark.sqlContext.read.parquet("/tmp/test1") > assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org