joy-m created SPARK-24220:
-----------------------------
Summary: java.lang.NullPointerException at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
Key: SPARK-24220
URL: https://issues.apache.org/jira/browse/SPARK-24220
Project: Spark
Issue Type: Bug
Components: Java API
Affects Versions: 2.2.0
Reporter: joy-m
def getInputStream(rows:Iterator[Row]): PipedInputStream ={
printMem("before gen string")
val pipedOutputStream = new PipedOutputStream()
(new Thread() {
override def run(){
if(rows == null){
logError("rows is null==========>")
}else{
println(s"record-----start-----${rows.length}")
try {
while (rows.hasNext) {
val row = rows.next()
println(row)
val str = row.mkString("\001") + "\r\n"
println(str)
pipedOutputStream.write(str.getBytes(StandardCharsets.UTF_8))
}
println("record-----end-----")
pipedOutputStream.close()
} catch {
case ex:Exception =>
ex.printStackTrace()
}
}
}
}).start()
println("pipedInPutStream----------")
val pipedInPutStream = new PipedInputStream()
pipedInPutStream.connect(pipedOutputStream)
println("pipedInPutStream--- conn-------")
printMem("after gen string")
pipedInPutStream
}
resDf.coalesce(15).foreachPartition(rows=>{
if(rows == null){
logError("rows is null=========>")
}else{
val copyCmd = s"COPY ${tableName} FROM STDIN with DELIMITER as '\001' NULL as
'null string'"
var con: Connection = null
try {
con = DriverManager.getConnection(adminUrl)
val copyManager = new CopyManager(con.asInstanceOf[BaseConnection])
val start = System.currentTimeMillis()
var count: Long = 0
var copyCount: Long = 0
println("before copyManager=====>")
copyCount += copyManager.copyIn(copyCmd, getInputStream(rows))
println("after copyManager=====>")
val finish = System.currentTimeMillis()
println("copyCount:" + copyCount + " count:" + count + " time(s):" + (finish -
start) / 1000)
con.close()
} catch {
case ex:Exception =>
ex.printStackTrace()
println(s"copyIn error!${ex.toString}")
} finally {
try {
if (con != null) {
con.close()
}
} catch {
case ex:SQLException =>
ex.printStackTrace()
println(s"copyIn error!${ex.toString}")
}
}
}
18/05/09 13:31:30 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[Thread-4,5,main]
java.lang.NullPointerException
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:87)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown
Source)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:392)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:389)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:107)
at scala.collection.AbstractIterator.size(Iterator.scala:1336)
at scala.collection.Iterator$class.length(Iterator.scala:1189)
at scala.collection.AbstractIterator.length(Iterator.scala:1336)
at org.admaster.jice.timeExpand.VisitsDivid$$anon$1.run(VisitsDivid.scala:548)
18/05/09 13:31:30 INFO storage.BlockManager: Found block rdd_15_47 remotely
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]