[ https://issues.apache.org/jira/browse/SPARK-16664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387858#comment-15387858 ]
Satish Kolli edited comment on SPARK-16664 at 7/21/16 3:06 PM: --------------------------------------------------------------- Here is a demonstration from the spark shell: {code} $SPARK_HOME/bin/spark-shell --master local[4] {code} {code} scala> scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.{Row, SQLContext} scala> scala> val size = 201 size: Int = 201 scala> val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size))) rdd: org.apache.spark.rdd.RDD[Seq[Long]] = ParallelCollectionRDD[36] at parallelize at <console>:53 scala> val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d)) rowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[37] at map at <console>:55 scala> scala> val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true)) schemas: List[org.apache.spark.sql.types.StructField] = List(StructField(name0,LongType,true), StructField(name1,LongType,true), StructField(name2,LongType,true), StructField(name3,LongType,true), StructField(na me4,LongType,true), StructField(name5,LongType,true), StructField(name6,LongType,true), StructField(name7,LongType,true), StructField(name8,LongType,true), StructField(name9,LongType,true), StructField(name10,Lo ngType,true), StructField(name11,LongType,true), StructField(name12,LongType,true), StructField(name13,LongType,true), StructField(name14,LongType,true), StructField(name15,LongType,true), StructField(name16,Lon gType,true), StructField(name17,LongType,true), StructField(name18,LongType,true), StructField(name19,LongType,true), StructField(name20,LongType,true), StructField... scala> val testSchema = StructType(schemas) testSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name0,LongType,true), StructField(name1,LongType,true), StructField(name2,LongType,true), StructField(name3,LongType,true), StructField( name4,LongType,true), StructField(name5,LongType,true), StructField(name6,LongType,true), StructField(name7,LongType,true), StructField(name8,LongType,true), StructField(name9,LongType,true), StructField(name10, LongType,true), StructField(name11,LongType,true), StructField(name12,LongType,true), StructField(name13,LongType,true), StructField(name14,LongType,true), StructField(name15,LongType,true), StructField(name16,L ongType,true), StructField(name17,LongType,true), StructField(name18,LongType,true), StructField(name19,LongType,true), StructField(name20,LongType,true), StructFie... scala> val testDf = sqlContext.createDataFrame(rowRdd, testSchema) testDf: org.apache.spark.sql.DataFrame = [name0: bigint, name1: bigint, name2: bigint, name3: bigint, name4: bigint, name5: bigint, name6: bigint, name7: bigint, name8: bigint, name9: bigint, name10: bigint, nam e11: bigint, name12: bigint, name13: bigint, name14: bigint, name15: bigint, name16: bigint, name17: bigint, name18: bigint, name19: bigint, name20: bigint, name21: bigint, name22: bigint, name23: bigint, name24 : bigint, name25: bigint, name26: bigint, name27: bigint, name28: bigint, name29: bigint, name30: bigint, name31: bigint, name32: bigint, name33: bigint, name34: bigint, name35: bigint, name36: bigint, name37: b igint, name38: bigint, name39: bigint, name40: bigint, name41: bigint, name42: bigint, name43: bigint, name44: bigint, name45: bigint, name46: bigint, name47: bigin... scala> {code} {color:green}*Take the first row from the data frame before calling persist:*{color} {code} scala> testDf.take(1) res9: Array[org.apache.spark.sql.Row] = Array([0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57, 58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,1 21,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173, 174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200]) {code} {color:red}*Take the first row from the data frame after calling persist:*{color} {code} scala> testDf.persist.take(1) res10: Array[org.apache.spark.sql.Row] = Array([0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 ,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]) {code} was (Author: skolli): Here is a demonstration from the spark shell: {code} $SPARK_HOME/bin/spark-shell --master local[4] {code} {code} scala> scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.{Row, SQLContext} scala> scala> val size = 201 size: Int = 201 scala> val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size))) rdd: org.apache.spark.rdd.RDD[Seq[Long]] = ParallelCollectionRDD[36] at parallelize at <console>:53 scala> val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d)) rowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[37] at map at <console>:55 scala> scala> val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true)) schemas: List[org.apache.spark.sql.types.StructField] = List(StructField(name0,LongType,true), StructField(name1,LongType,true), StructField(name2,LongType,true), StructField(name3,LongType,true), StructField(na me4,LongType,true), StructField(name5,LongType,true), StructField(name6,LongType,true), StructField(name7,LongType,true), StructField(name8,LongType,true), StructField(name9,LongType,true), StructField(name10,Lo ngType,true), StructField(name11,LongType,true), StructField(name12,LongType,true), StructField(name13,LongType,true), StructField(name14,LongType,true), StructField(name15,LongType,true), StructField(name16,Lon gType,true), StructField(name17,LongType,true), StructField(name18,LongType,true), StructField(name19,LongType,true), StructField(name20,LongType,true), StructField... scala> val testSchema = StructType(schemas) testSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name0,LongType,true), StructField(name1,LongType,true), StructField(name2,LongType,true), StructField(name3,LongType,true), StructField( name4,LongType,true), StructField(name5,LongType,true), StructField(name6,LongType,true), StructField(name7,LongType,true), StructField(name8,LongType,true), StructField(name9,LongType,true), StructField(name10, LongType,true), StructField(name11,LongType,true), StructField(name12,LongType,true), StructField(name13,LongType,true), StructField(name14,LongType,true), StructField(name15,LongType,true), StructField(name16,L ongType,true), StructField(name17,LongType,true), StructField(name18,LongType,true), StructField(name19,LongType,true), StructField(name20,LongType,true), StructFie... scala> val testDf = sqlContext.createDataFrame(rowRdd, testSchema) testDf: org.apache.spark.sql.DataFrame = [name0: bigint, name1: bigint, name2: bigint, name3: bigint, name4: bigint, name5: bigint, name6: bigint, name7: bigint, name8: bigint, name9: bigint, name10: bigint, nam e11: bigint, name12: bigint, name13: bigint, name14: bigint, name15: bigint, name16: bigint, name17: bigint, name18: bigint, name19: bigint, name20: bigint, name21: bigint, name22: bigint, name23: bigint, name24 : bigint, name25: bigint, name26: bigint, name27: bigint, name28: bigint, name29: bigint, name30: bigint, name31: bigint, name32: bigint, name33: bigint, name34: bigint, name35: bigint, name36: bigint, name37: b igint, name38: bigint, name39: bigint, name40: bigint, name41: bigint, name42: bigint, name43: bigint, name44: bigint, name45: bigint, name46: bigint, name47: bigin... scala> {code} {color:green}*Take the first row from the data frame before calling persist:*{color} {code} scala> testDf.take(1) res9: Array[org.apache.spark.sql.Row] = Array([0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57, 58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,1 21,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173, 174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200]) {code} {color:red}*Take the first row from the data frame after calling persist:*{color} {code} scala> testDf.persist.take(1) res10: Array[org.apache.spark.sql.Row] = Array([0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 ,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]) scala> {code} > Spark 1.6.2 - Persist call on Data frames with more than 200 columns is > wiping out the data. > -------------------------------------------------------------------------------------------- > > Key: SPARK-16664 > URL: https://issues.apache.org/jira/browse/SPARK-16664 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.2 > Reporter: Satish Kolli > Priority: Blocker > > Calling persist on a data frame with more than 200 columns is removing the > data from the data frame. This is an issue in Spark 1.6.2. Works with out any > issues in Spark 1.6.1 > Following test case demonstrates problem. Please let me know if you need any > additional information. Thanks. > {code} > import org.apache.spark._ > import org.apache.spark.rdd.RDD > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{Row, SQLContext} > import org.scalatest.FunSuite > class TestSpark162_1 extends FunSuite { > test("test data frame with 200 columns") { > val sparkConfig = new SparkConf() > val parallelism = 5 > sparkConfig.set("spark.default.parallelism", s"$parallelism") > sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism") > val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig) > val sqlContext = new SQLContext(sc) > // create dataframe with 200 columns and fake 200 values > val size = 200 > val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size))) > val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d)) > val schemas = List.range(0, size).map(a => StructField("name"+ a, > LongType, true)) > val testSchema = StructType(schemas) > val testDf = sqlContext.createDataFrame(rowRdd, testSchema) > // test value > assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == > 100) > sc.stop() > } > test("test data frame with 201 columns") { > val sparkConfig = new SparkConf() > val parallelism = 5 > sparkConfig.set("spark.default.parallelism", s"$parallelism") > sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism") > val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig) > val sqlContext = new SQLContext(sc) > // create dataframe with 201 columns and fake 201 values > val size = 201 > val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size))) > val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d)) > val schemas = List.range(0, size).map(a => StructField("name"+ a, > LongType, true)) > val testSchema = StructType(schemas) > val testDf = sqlContext.createDataFrame(rowRdd, testSchema) > // test value > assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == > 100) > sc.stop() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org