[
https://issues.apache.org/jira/browse/SPARK-24220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473800#comment-16473800
]
joy-m commented on SPARK-24220:
-------------------------------
@[~kiszk] I used yarn cluster mode to run my application.when i used
pipeStream,the issue arised.So I changed to use fileInputStream, the issue not
appeared,I do not known why ?
> java.lang.NullPointerException at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-24220
> URL: https://issues.apache.org/jira/browse/SPARK-24220
> Project: Spark
> Issue Type: Bug
> Components: Java API
> Affects Versions: 2.2.0
> Reporter: joy-m
> Priority: Major
>
> def getInputStream(rows:Iterator[Row]): PipedInputStream ={
> printMem("before gen string")
> val pipedOutputStream = new PipedOutputStream()
> (new Thread() {
> override def run(){
> if(rows == null){
> logError("rows is null==========>")
> }else{
> println(s"record-----start-----${rows.length}")
> try {
> while (rows.hasNext) {
> val row = rows.next()
> println(row)
> val str = row.mkString("\001") + "\r\n"
> println(str)
> pipedOutputStream.write(str.getBytes(StandardCharsets.UTF_8))
> }
> println("record-----end-----")
> pipedOutputStream.close()
> } catch {
> case ex:Exception =>
> ex.printStackTrace()
> }
> }
> }
> }).start()
> println("pipedInPutStream----------")
> val pipedInPutStream = new PipedInputStream()
> pipedInPutStream.connect(pipedOutputStream)
> println("pipedInPutStream--- conn-------")
> printMem("after gen string")
> pipedInPutStream
> }
> resDf.coalesce(15).foreachPartition(rows=>{
> if(rows == null){
> logError("rows is null=========>")
> }else{
> val copyCmd = s"COPY ${tableName} FROM STDIN with DELIMITER as '\001' NULL
> as 'null string'"
> var con: Connection = null
> try {
> con = DriverManager.getConnection(adminUrl)
> val copyManager = new CopyManager(con.asInstanceOf[BaseConnection])
> val start = System.currentTimeMillis()
> var count: Long = 0
> var copyCount: Long = 0
> println("before copyManager=====>")
> copyCount += copyManager.copyIn(copyCmd, getInputStream(rows))
> println("after copyManager=====>")
> val finish = System.currentTimeMillis()
> println("copyCount:" + copyCount + " count:" + count + " time(s):" + (finish
> - start) / 1000)
> con.close()
> } catch {
> case ex:Exception =>
> ex.printStackTrace()
> println(s"copyIn error!${ex.toString}")
> } finally {
> try {
> if (con != null) {
> con.close()
> }
> } catch {
> case ex:SQLException =>
> ex.printStackTrace()
> println(s"copyIn error!${ex.toString}")
> }
> }
> }
>
> 18/05/09 13:31:30 ERROR util.SparkUncaughtExceptionHandler: Uncaught
> exception in thread Thread[Thread-4,5,main]
> java.lang.NullPointerException
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
> at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:87)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown
> Source)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:392)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:389)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
> at
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:107)
> at scala.collection.AbstractIterator.size(Iterator.scala:1336)
> at scala.collection.Iterator$class.length(Iterator.scala:1189)
> at scala.collection.AbstractIterator.length(Iterator.scala:1336)
> at
> org.admaster.jice.timeExpand.VisitsDivid$$anon$1.run(VisitsDivid.scala:548)
> 18/05/09 13:31:30 INFO storage.BlockManager: Found block rdd_15_47 remotely
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]