[ 
https://issues.apache.org/jira/browse/SPARK-32633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180222#comment-17180222
 ] 

Hyukjin Kwon commented on SPARK-32633:
--------------------------------------

Using {{Row}} seems working fine in my local at the latest master branch:

{code}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val offsets = Range(0,10)
val schema = new 
StructType(Array(StructField("idInt",LongType),StructField("idString",StringType),
 StructField("tbool",BooleanType), StructField("tdouble", DoubleType)))

val rdd = spark.sparkContext.parallelize(offsets, offsets.size).flatMap(offset 
=> {
    val result = Range(offset*10,offset*10+10).map(_.toLong)
    result.map(record => {
      val row = new Array[Any](4)
      row.update(0, record)
      row.update(1, record.toString)
      row.update(2,record%2==0)
      row.update(3, record+0.01)

      Row(row:_*)
    })
  })

if(rdd.isEmpty()) println("rdd is empty")
val df = spark.createDataFrame(rdd, schema)
df.show()
{code}

{code}
+-----+--------+-----+-------+
|idInt|idString|tbool|tdouble|
+-----+--------+-----+-------+
|    0|       0| true|   0.01|
|    1|       1|false|   1.01|
|    2|       2| true|   2.01|
|    3|       3|false|   3.01|
|    4|       4| true|   4.01|
|    5|       5|false|   5.01|
|    6|       6| true|   6.01|
|    7|       7|false|   7.01|
|    8|       8| true|   8.01|
|    9|       9|false|   9.01|
|   10|      10| true|  10.01|
|   11|      11|false|  11.01|
|   12|      12| true|  12.01|
|   13|      13|false|  13.01|
|   14|      14| true|  14.01|
|   15|      15|false|  15.01|
|   16|      16| true|  16.01|
|   17|      17|false|  17.01|
|   18|      18| true|  18.01|
|   19|      19|false|  19.01|
+-----+--------+-----+-------+
only showing top 20 rows
{code}

> GenericRowWithSchema cannot be cast to GenTraversableOnce
> ---------------------------------------------------------
>
>                 Key: SPARK-32633
>                 URL: https://issues.apache.org/jira/browse/SPARK-32633
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.6, 3.0.0
>         Environment: my computer: MacOs 10.15.5
> server: CentOS Linux release 7.7.1908 (Core)
> spark: 2.4.6/3.0.0 pre-build for hadoop 2.7
>            Reporter: ImportMengjie
>            Priority: Major
>
> When I run this in the server using `spark-submit` or my computer using ide 
> run with spark-2.4.6 is right.
> But when I use `spark-submit` on my computer with spark-2.4.6 or use 
> spark-3.0.0 in server the cast Exception will throw in `rdd.isEmpty()`.
> Here is my code:
> {code:java}
> val rdd = session.sparkContext
>   .parallelize(offsets, offsets.size)
>   .flatMap(offset => {
>     val query  = s"${config.exec} SKIP ${offset.start} LIMIT ${offset.size}"
>     val result = new Neo4jSessionAwareIterator(neo4jConfig, query, 
> Maps.newHashMap(), false)
>     val fields = if (result.hasNext) result.peek().keys().asScala else List()
>     val schema =
>       if (result.hasNext)
>         StructType(
>           fields
>             .map(k => (k, result.peek().get(k).`type`()))
>             .map(keyType => CypherTypes.field(keyType)))
>       else new StructType()
>     result.map(record => {
>       val row = new Array[Any](record.keys().size())
>       for (i <- row.indices)
>         row.update(i, Executor.convert(record.get(i).asObject()))
>       new GenericRowWithSchema(values = row, schema).asInstanceOf[Row]
>     })
>   })
> if (rdd.isEmpty())
>   throw new RuntimeException(
>     "Please check your cypher sentence. because use it search nothing!")
> val schema = rdd.repartition(1).first().schema
> session.createDataFrame(rdd, schema){code}
> here is my exception msg (section):
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to scala.collection.GenTraversableOnce
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
>         at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>         at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>         at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
>         at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1429)
>         at 
> scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
>         at 
> scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
>         at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
>         at 
> scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
>         at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1409)
> Driver stacktrace:
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
>         at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
>         at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1517)
>         at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
>         at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1517)
>         at 
> com.xx.xx.tools.importer.reader.Neo4JReader.read(ServerBaseReader.scala:146){code}
>  
> Thank you for looking!!!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to