[ 
https://issues.apache.org/jira/browse/SPARK-16664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387858#comment-15387858
 ] 

Satish Kolli edited comment on SPARK-16664 at 7/21/16 3:06 PM:
---------------------------------------------------------------

Here is a demonstration from the spark shell: 

{code}
$SPARK_HOME/bin/spark-shell --master local[4]
{code}

{code}
scala>

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{Row, SQLContext}

scala>

scala> val size = 201
size: Int = 201

scala> val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
rdd: org.apache.spark.rdd.RDD[Seq[Long]] = ParallelCollectionRDD[36] at 
parallelize at <console>:53

scala> val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
rowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = 
MapPartitionsRDD[37] at map at <console>:55

scala>

scala> val schemas = List.range(0, size).map(a => StructField("name"+ a, 
LongType, true))
schemas: List[org.apache.spark.sql.types.StructField] = 
List(StructField(name0,LongType,true), StructField(name1,LongType,true), 
StructField(name2,LongType,true), StructField(name3,LongType,true), 
StructField(na                                                                  
     me4,LongType,true), StructField(name5,LongType,true), 
StructField(name6,LongType,true), StructField(name7,LongType,true), 
StructField(name8,LongType,true), StructField(name9,LongType,true), 
StructField(name10,Lo                                                           
            ngType,true), StructField(name11,LongType,true), 
StructField(name12,LongType,true), StructField(name13,LongType,true), 
StructField(name14,LongType,true), StructField(name15,LongType,true), 
StructField(name16,Lon                                                          
             gType,true), StructField(name17,LongType,true), 
StructField(name18,LongType,true), StructField(name19,LongType,true), 
StructField(name20,LongType,true), StructField...
scala> val testSchema = StructType(schemas)
testSchema: org.apache.spark.sql.types.StructType = 
StructType(StructField(name0,LongType,true), StructField(name1,LongType,true), 
StructField(name2,LongType,true), StructField(name3,LongType,true), 
StructField(                                                                    
   name4,LongType,true), StructField(name5,LongType,true), 
StructField(name6,LongType,true), StructField(name7,LongType,true), 
StructField(name8,LongType,true), StructField(name9,LongType,true), 
StructField(name10,                                                             
          LongType,true), StructField(name11,LongType,true), 
StructField(name12,LongType,true), StructField(name13,LongType,true), 
StructField(name14,LongType,true), StructField(name15,LongType,true), 
StructField(name16,L                                                            
           ongType,true), StructField(name17,LongType,true), 
StructField(name18,LongType,true), StructField(name19,LongType,true), 
StructField(name20,LongType,true), StructFie...
scala> val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
testDf: org.apache.spark.sql.DataFrame = [name0: bigint, name1: bigint, name2: 
bigint, name3: bigint, name4: bigint, name5: bigint, name6: bigint, name7: 
bigint, name8: bigint, name9: bigint, name10: bigint, nam                       
                                                e11: bigint, name12: bigint, 
name13: bigint, name14: bigint, name15: bigint, name16: bigint, name17: bigint, 
name18: bigint, name19: bigint, name20: bigint, name21: bigint, name22: bigint, 
name23: bigint, name24                                                          
             : bigint, name25: bigint, name26: bigint, name27: bigint, name28: 
bigint, name29: bigint, name30: bigint, name31: bigint, name32: bigint, name33: 
bigint, name34: bigint, name35: bigint, name36: bigint, name37: b               
                                                        igint, name38: bigint, 
name39: bigint, name40: bigint, name41: bigint, name42: bigint, name43: bigint, 
name44: bigint, name45: bigint, name46: bigint, name47: bigin...
scala>
{code}


{color:green}*Take the first row from the data frame before calling 
persist:*{color}
{code}
scala> testDf.take(1)
res9: Array[org.apache.spark.sql.Row] = 
Array([0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,
                                                                       
58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,1
                                                                       
21,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,
                                                                       
174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200])
{code}

{color:red}*Take the first row from the data frame after calling 
persist:*{color}
{code}
scala> testDf.persist.take(1)
res10: Array[org.apache.spark.sql.Row] = 
Array([0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
                                                                      
,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])
{code}


was (Author: skolli):
Here is a demonstration from the spark shell: 

{code}
$SPARK_HOME/bin/spark-shell --master local[4]
{code}

{code}
scala>

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{Row, SQLContext}

scala>

scala> val size = 201
size: Int = 201

scala> val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
rdd: org.apache.spark.rdd.RDD[Seq[Long]] = ParallelCollectionRDD[36] at 
parallelize at <console>:53

scala> val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
rowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = 
MapPartitionsRDD[37] at map at <console>:55

scala>

scala> val schemas = List.range(0, size).map(a => StructField("name"+ a, 
LongType, true))
schemas: List[org.apache.spark.sql.types.StructField] = 
List(StructField(name0,LongType,true), StructField(name1,LongType,true), 
StructField(name2,LongType,true), StructField(name3,LongType,true), 
StructField(na                                                                  
     me4,LongType,true), StructField(name5,LongType,true), 
StructField(name6,LongType,true), StructField(name7,LongType,true), 
StructField(name8,LongType,true), StructField(name9,LongType,true), 
StructField(name10,Lo                                                           
            ngType,true), StructField(name11,LongType,true), 
StructField(name12,LongType,true), StructField(name13,LongType,true), 
StructField(name14,LongType,true), StructField(name15,LongType,true), 
StructField(name16,Lon                                                          
             gType,true), StructField(name17,LongType,true), 
StructField(name18,LongType,true), StructField(name19,LongType,true), 
StructField(name20,LongType,true), StructField...
scala> val testSchema = StructType(schemas)
testSchema: org.apache.spark.sql.types.StructType = 
StructType(StructField(name0,LongType,true), StructField(name1,LongType,true), 
StructField(name2,LongType,true), StructField(name3,LongType,true), 
StructField(                                                                    
   name4,LongType,true), StructField(name5,LongType,true), 
StructField(name6,LongType,true), StructField(name7,LongType,true), 
StructField(name8,LongType,true), StructField(name9,LongType,true), 
StructField(name10,                                                             
          LongType,true), StructField(name11,LongType,true), 
StructField(name12,LongType,true), StructField(name13,LongType,true), 
StructField(name14,LongType,true), StructField(name15,LongType,true), 
StructField(name16,L                                                            
           ongType,true), StructField(name17,LongType,true), 
StructField(name18,LongType,true), StructField(name19,LongType,true), 
StructField(name20,LongType,true), StructFie...
scala> val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
testDf: org.apache.spark.sql.DataFrame = [name0: bigint, name1: bigint, name2: 
bigint, name3: bigint, name4: bigint, name5: bigint, name6: bigint, name7: 
bigint, name8: bigint, name9: bigint, name10: bigint, nam                       
                                                e11: bigint, name12: bigint, 
name13: bigint, name14: bigint, name15: bigint, name16: bigint, name17: bigint, 
name18: bigint, name19: bigint, name20: bigint, name21: bigint, name22: bigint, 
name23: bigint, name24                                                          
             : bigint, name25: bigint, name26: bigint, name27: bigint, name28: 
bigint, name29: bigint, name30: bigint, name31: bigint, name32: bigint, name33: 
bigint, name34: bigint, name35: bigint, name36: bigint, name37: b               
                                                        igint, name38: bigint, 
name39: bigint, name40: bigint, name41: bigint, name42: bigint, name43: bigint, 
name44: bigint, name45: bigint, name46: bigint, name47: bigin...
scala>
{code}


{color:green}*Take the first row from the data frame before calling 
persist:*{color}
{code}
scala> testDf.take(1)
res9: Array[org.apache.spark.sql.Row] = 
Array([0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,
                                                                       
58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,1
                                                                       
21,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,
                                                                       
174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200])
{code}

{color:red}*Take the first row from the data frame after calling 
persist:*{color}
{code}
scala> testDf.persist.take(1)
res10: Array[org.apache.spark.sql.Row] = 
Array([0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
                                                                      
,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])

scala>
{code}

> Spark 1.6.2 - Persist call on Data frames with more than 200 columns is 
> wiping out the data.
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16664
>                 URL: https://issues.apache.org/jira/browse/SPARK-16664
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.2
>            Reporter: Satish Kolli
>            Priority: Blocker
>
> Calling persist on a data frame with more than 200 columns is removing the 
> data from the data frame. This is an issue in Spark 1.6.2. Works with out any 
> issues in Spark 1.6.1
> Following test case demonstrates problem. Please let me know if you need any 
> additional information. Thanks.
> {code}
> import org.apache.spark._
> import org.apache.spark.rdd.RDD
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{Row, SQLContext}
> import org.scalatest.FunSuite
> class TestSpark162_1 extends FunSuite {
>   test("test data frame with 200 columns") {
>     val sparkConfig = new SparkConf()
>     val parallelism = 5
>     sparkConfig.set("spark.default.parallelism", s"$parallelism")
>     sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")
>     val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
>     val sqlContext = new SQLContext(sc)
>     // create dataframe with 200 columns and fake 200 values
>     val size = 200
>     val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
>     val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
>     val schemas = List.range(0, size).map(a => StructField("name"+ a, 
> LongType, true))
>     val testSchema = StructType(schemas)
>     val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
>     // test value
>     assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 
> 100)
>     sc.stop()
>   }
>   test("test data frame with 201 columns") {
>     val sparkConfig = new SparkConf()
>     val parallelism = 5
>     sparkConfig.set("spark.default.parallelism", s"$parallelism")
>     sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")
>     val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
>     val sqlContext = new SQLContext(sc)
>     // create dataframe with 201 columns and fake 201 values
>     val size = 201
>     val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
>     val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
>     val schemas = List.range(0, size).map(a => StructField("name"+ a, 
> LongType, true))
>     val testSchema = StructType(schemas)
>     val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
>     // test value
>     assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 
> 100)
>     sc.stop()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to