[
https://issues.apache.org/jira/browse/SPARK-16664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen resolved SPARK-16664.
-------------------------------
Resolution: Fixed
Fix Version/s: 2.1.0
2.0.1
1.6.3
Issue resolved by pull request 14324
[https://github.com/apache/spark/pull/14324]
> Spark 1.6.2 - Persist call on Data frames with more than 200 columns is
> wiping out the data.
> --------------------------------------------------------------------------------------------
>
> Key: SPARK-16664
> URL: https://issues.apache.org/jira/browse/SPARK-16664
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.6.2
> Reporter: Satish Kolli
> Priority: Blocker
> Fix For: 1.6.3, 2.0.1, 2.1.0
>
>
> Calling persist on a data frame with more than 200 columns is removing the
> data from the data frame. This is an issue in Spark 1.6.2. Works with out any
> issues in Spark 1.6.1
> Following test case demonstrates problem. Please let me know if you need any
> additional information. Thanks.
> {code}
> import org.apache.spark._
> import org.apache.spark.rdd.RDD
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{Row, SQLContext}
> import org.scalatest.FunSuite
> class TestSpark162_1 extends FunSuite {
> test("test data frame with 200 columns") {
> val sparkConfig = new SparkConf()
> val parallelism = 5
> sparkConfig.set("spark.default.parallelism", s"$parallelism")
> sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")
> val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
> val sqlContext = new SQLContext(sc)
> // create dataframe with 200 columns and fake 200 values
> val size = 200
> val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
> val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
> val schemas = List.range(0, size).map(a => StructField("name"+ a,
> LongType, true))
> val testSchema = StructType(schemas)
> val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
> // test value
> assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] ==
> 100)
> sc.stop()
> }
> test("test data frame with 201 columns") {
> val sparkConfig = new SparkConf()
> val parallelism = 5
> sparkConfig.set("spark.default.parallelism", s"$parallelism")
> sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")
> val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
> val sqlContext = new SQLContext(sc)
> // create dataframe with 201 columns and fake 201 values
> val size = 201
> val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
> val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
> val schemas = List.range(0, size).map(a => StructField("name"+ a,
> LongType, true))
> val testSchema = StructType(schemas)
> val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
> // test value
> assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] ==
> 100)
> sc.stop()
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]