[ 
https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navige updated SPARK-19629:
---------------------------
    Description: 
Running the following two examples will lead to different results depending on 
whether the code is run using Spark 1.6 or Spark 2.1. 

h1.What does the example do?
- The code creates an exemplary dataframe with random data. 
- The dataframe is repartitioned and stored to disk. 
- Then the dataframe is re-read from disk.
- The number of partitions of the dataframe is considered.

h1. What is the/my expected behaviour?
The number of partitions specified when storing the dataframe should be the 
same as when re-loading the dataframe from disk.

h1. Differences in Spark 1.6 and Spark 2
On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 
partitions as specified using npartitions; on Spark 2.1 the number of 
partitions will equal the number of local nodes specified when starting Spark 
(using local[X] as master). 

Looking at the data produced, in both Spark versions the number of files in the 
parquet directory is the same - so Spark 2 produces so many files as the number 
partitions when storing, but when reading in Spark 2, the number of partitions 
is messed up.

h1.Minimal code example
{code:none}
# run on Spark 1.6
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
 val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", 
FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6
{code}

{code:none}
# run on Spark 2.1
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = spark.sqlContext.createDataFrame(rdd, 
StructType(Seq(StructField("test", FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = spark.sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1
{code}

h1.What could other solutions be, if this is not a bug?
If this is intended, what about introducing a parameter at reading time, which 
specifies whether the data should truly be repartitioned (depending on the 
number of nodes) or should be read "as-is".

  was:
Running the following two examples will lead to different results depending on 
whether the code is run using Spark 1.6 or Spark 2.1. 

h1.What does the example do?
- The code creates an exemplary dataframe with random data. 
- The dataframe is repartitioned and stored to disk. 
- Then the dataframe is re-read from disk.
- The number of partitions of the dataframe is considered.

h1. What is the expected behaviour?
The number of partitions specified when storing the dataframe should be the 
same as when re-loading the dataframe from disk.

On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 
partitions as specified using npartitions; on Spark 2.1 the number of 
partitions will equal the number of local nodes specified when starting Spark 
(using local[X] as master).

h1.Minimal code example
{code:none}
# run on Spark 1.6
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
 val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", 
FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6
{code}

{code:none}
# run on Spark 2.1
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = spark.sqlContext.createDataFrame(rdd, 
StructType(Seq(StructField("test", FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = spark.sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1
{code}


> Partitioning of Parquet is not considered correctly at loading in local[X] 
> mode
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-19629
>                 URL: https://issues.apache.org/jira/browse/SPARK-19629
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 2.0.0, 2.1.0
>         Environment: Tested using docker run 
> gettyimages/spark:1.6.1-hadoop-2.6 and
> docker run gettyimages/spark:2.1.0-hadoop-2.7.
>            Reporter: Navige
>            Priority: Minor
>
> Running the following two examples will lead to different results depending 
> on whether the code is run using Spark 1.6 or Spark 2.1. 
> h1.What does the example do?
> - The code creates an exemplary dataframe with random data. 
> - The dataframe is repartitioned and stored to disk. 
> - Then the dataframe is re-read from disk.
> - The number of partitions of the dataframe is considered.
> h1. What is the/my expected behaviour?
> The number of partitions specified when storing the dataframe should be the 
> same as when re-loading the dataframe from disk.
> h1. Differences in Spark 1.6 and Spark 2
> On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 
> partitions as specified using npartitions; on Spark 2.1 the number of 
> partitions will equal the number of local nodes specified when starting Spark 
> (using local[X] as master). 
> Looking at the data produced, in both Spark versions the number of files in 
> the parquet directory is the same - so Spark 2 produces so many files as the 
> number partitions when storing, but when reading in Spark 2, the number of 
> partitions is messed up.
> h1.Minimal code example
> {code:none}
> # run on Spark 1.6
> import scala.util.Random
> import org.apache.spark.sql.types.{StructField, StructType, FloatType}
> import org.apache.spark.sql.Row
>  val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
> val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", 
> FloatType))))
> val npartitions = 10
> df.repartition(npartitions).write.parquet("/tmp/test1")
> val read = sqlContext.read.parquet("/tmp/test1")
> assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6
> {code}
> {code:none}
> # run on Spark 2.1
> import scala.util.Random
> import org.apache.spark.sql.types.{StructField, StructType, FloatType}
> import org.apache.spark.sql.Row
> val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
> val df = spark.sqlContext.createDataFrame(rdd, 
> StructType(Seq(StructField("test", FloatType))))
> val npartitions = 10
> df.repartition(npartitions).write.parquet("/tmp/test1")
> val read = spark.sqlContext.read.parquet("/tmp/test1")
> assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1
> {code}
> h1.What could other solutions be, if this is not a bug?
> If this is intended, what about introducing a parameter at reading time, 
> which specifies whether the data should truly be repartitioned (depending on 
> the number of nodes) or should be read "as-is".



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to