joy-m created SPARK-24220:
-----------------------------

             Summary: java.lang.NullPointerException at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
                 Key: SPARK-24220
                 URL: https://issues.apache.org/jira/browse/SPARK-24220
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.2.0
            Reporter: joy-m


def getInputStream(rows:Iterator[Row]): PipedInputStream ={
 printMem("before gen string")
 val pipedOutputStream = new PipedOutputStream()
 (new Thread() {
 override def run(){
 if(rows == null){
 logError("rows is null==========>")
 }else{
 println(s"record-----start-----${rows.length}")
 try {
 while (rows.hasNext) {
 val row = rows.next()
 println(row)
 val str = row.mkString("\001") + "\r\n"
 println(str)
 pipedOutputStream.write(str.getBytes(StandardCharsets.UTF_8))
 }
 println("record-----end-----")
 pipedOutputStream.close()
 } catch {
 case ex:Exception =>
 ex.printStackTrace()
 }
 }

 }
 }).start()

 println("pipedInPutStream----------")
 val pipedInPutStream = new PipedInputStream()
 pipedInPutStream.connect(pipedOutputStream)
 println("pipedInPutStream--- conn-------")
 printMem("after gen string")
 pipedInPutStream
}

resDf.coalesce(15).foreachPartition(rows=>{
 if(rows == null){
 logError("rows is null=========>")
 }else{
 val copyCmd = s"COPY ${tableName} FROM STDIN with DELIMITER as '\001' NULL as 
'null string'"
 var con: Connection = null
 try {
 con = DriverManager.getConnection(adminUrl)
 val copyManager = new CopyManager(con.asInstanceOf[BaseConnection])
 val start = System.currentTimeMillis()
 var count: Long = 0
 var copyCount: Long = 0

 println("before copyManager=====>")
 copyCount += copyManager.copyIn(copyCmd, getInputStream(rows))
 println("after copyManager=====>")
 val finish = System.currentTimeMillis()
 println("copyCount:" + copyCount + " count:" + count + " time(s):" + (finish - 
start) / 1000)
 con.close()
 } catch {
 case ex:Exception =>
 ex.printStackTrace()
 println(s"copyIn error!${ex.toString}")
 } finally {
 try {
 if (con != null) {
 con.close()
 }
 } catch {
 case ex:SQLException =>
 ex.printStackTrace()
 println(s"copyIn error!${ex.toString}")
 }
 }
 }

 

18/05/09 13:31:30 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[Thread-4,5,main]
java.lang.NullPointerException
 at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
 at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:87)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown
 Source)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:392)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:389)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
 at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
 at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
 at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
 at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
 at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:107)
 at scala.collection.AbstractIterator.size(Iterator.scala:1336)
 at scala.collection.Iterator$class.length(Iterator.scala:1189)
 at scala.collection.AbstractIterator.length(Iterator.scala:1336)
 at org.admaster.jice.timeExpand.VisitsDivid$$anon$1.run(VisitsDivid.scala:548)
18/05/09 13:31:30 INFO storage.BlockManager: Found block rdd_15_47 remotely



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to