[
https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sidhartha updated SPARK-26558:
------------------------------
Description:
h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while
saving data into HDFS using Spark ?
I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.
Below are the versions of Spark & Scala I am using:
spark-core: 2.0.0
spark-sql: 2.0.0
Scala version: 2.11.8
To do that, I wrote the following code:
{code:java}
val conf = new
SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval",
"1200s") .set("spark.network.timeout", "12000s")
.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
.set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress",
"true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer",
classOf[org.apache.spark.serializer.KryoSerializer].getName)
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.yarn.driver.memoryOverhead", "8192")
.set("spark.yarn.executor.memoryOverhead", "8192")
.set("spark.sql.shuffle.partitions", "400")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "true")
.set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances",
"12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4")
.set("spark.files.maxPartitionBytes", "268435468")
val flagCol = "del_flag" val spark =
SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition",
"true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
val dtypes = spark.read.format("jdbc").option("url",
hiveMetaConURL).option("dbtable", "(select source_type, hive_type from
hivemeta.types) as gpHiveDataTypes").option("user",
metaUserName).option("password", metaPassword).load()
val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL)
.option("dbtable", "(select source_columns, precision_columns,
partition_columns from hivemeta.source_table where
tablename='gpschema.empdocs') as colsPrecision") .option("user",
metaUserName).option("password", metaPassword).load()
val dataMapper = dtypes.as[(String, String)].collect().toMap
val gpCols = spColsDF.select("source_columns").map(row =>
row.getString(0)).collect.mkString(",")
val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s =>
s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList
val precisionCols =
spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x
=> x.getAs[String](0).split(","))
val prtn_String_columns =
spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
val partCList = prtn_String_columns.split(",").map(x => col(x))
var splitPrecisionCols = precisionCols.split(",") for (i <- splitPrecisionCols)
{ precisionColsText += i.concat(s"::${textType} as ").concat(s"${i}_text")
textList += s"${i}_text:${textType}" }
val pCols = precisionColsText.mkString(",")
val allColumns = gpColumns.concat("," + pCols)
val allColumnsSeq = allColumns.split(",").toSeq
val allColumnsSeqC = allColumnsSeq.map(x => column(x))
val gpColSeq = gpColumns.split(",").toSeq
def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String],
allColumns: String, dataMapper: Map[String, String], partition_columns:
Array[String], spark: SparkSession): DataFrame = {
val yearDF =
spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url",
connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema")
.option("user", devUserName).option("password", devPassword)
.option("partitionColumn","header_id") .load() .where("year=2017 and month=12")
.select(gpColSeq map col:_*) .withColumn(flagCol, lit(0))
val totalCols: List[String] = splitColumns ++ textList
val allColsOrdered = yearDF.columns.diff(partition_columns) ++
partition_columns val allCols = allColsOrdered.map(colname =>
org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols: _*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType ==
StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) { (tempDF, colName) =>
tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName),
"[\r\n]+", " "), "[\t]+", " ")) } finalDF }
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper,
partition_columns, spark)
dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/") }
}{code}
When I submit the job, I see the tasks at below lines complete:
{code:java}
val dataMapper = dtypes.as[(String, String)].collect().toMap
val gpCols = spColsDF.select("source_columns").map(row =>
row.getString(0)).collect.mkString(",")
val precisionCols =
spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x
=> x.getAs[String](0).split(","))
val prtn_String_columns =
spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
{code}
Once the task of saving the prepared dataframe starts, which is:
{noformat}
dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/"){noformat}
job ends with the exception: \{{}}
{noformat}
java.util.NoSuchElementException{noformat}
I am submitting the job using below spark-submit command:
{code:java}
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition
--master=yarn --conf spark.ui.port=4090 --driver-class-path
/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --conf
spark.jars=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --executor-cores
3 --executor-memory 13G --keytab /home/hdpdevusr/hdpdevusr.keytab --principal
[email protected] --files
/usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties
--name Splinter --conf
spark.executor.extraClassPath=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar
splinter_2.11-0.1.jar{code}
I see the command launches the executors as per the specified numbers in the
code which is 12 executors with 4 cores each.
Only 5 out of 48 tasks will complete and the job ends with the exception:
{code:java}
[Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0
in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at
io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43) at
io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110)
at
io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109)
at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by:
org.apache.spark.SparkException: Job 5 cancelled because killed via the Web UI
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at
org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1446)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at
org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1439)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1701)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 44 more 18/12/27 10:30:53 WARN ShutdownHookManager: ShutdownHook '$anon$2'
timeout, java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException at
java.util.concurrent.FutureTask.get(FutureTask.java:205) at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)
18/12/27 10:30:53 ERROR Utils: Uncaught exception in thread pool-6-thread-1
java.lang.InterruptedException at java.lang.Object.wait(Native Method) at
java.lang.Thread.join(Thread.java:1249) at
java.lang.Thread.join(Thread.java:1323) at
org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:199) at
org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1919)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317) at
org.apache.spark.SparkContext.stop(SparkContext.scala:1918) at
org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948) at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192) at
org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745){code}
I don't understand where did it go wrong whether in code or in any
configuration applied in the job.
I posted the same on Stackoverflow as well. For executor images, the below link
can be referred:[
[https://stackoverflow.com/questions/54002002/how-to-fix-java-util-nosuchelementexception-while-saving-data-into-hdfs-using-sp/54002423?noredirect=1#comment94843141_54002423]|http://example.com]
Could anyone let me know how to fix this exception ?
was:
h1. How to fix java.util.NoSuchElementException while saving data into HDFS
using Spark ?
I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.
Below are the versions of Spark & Scala I am using:
spark-core: 2.0.0
spark-sql: 2.0.0
Scala version: 2.11.8
To do that, I wrote the following code:
{code:java}
val conf = new
SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval",
"1200s") .set("spark.network.timeout", "12000s")
.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
.set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress",
"true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer",
classOf[org.apache.spark.serializer.KryoSerializer].getName)
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.yarn.driver.memoryOverhead", "8192")
.set("spark.yarn.executor.memoryOverhead", "8192")
.set("spark.sql.shuffle.partitions", "400")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "true")
.set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances",
"12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4")
.set("spark.files.maxPartitionBytes", "268435468")
val flagCol = "del_flag" val spark =
SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition",
"true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
val dtypes = spark.read.format("jdbc").option("url",
hiveMetaConURL).option("dbtable", "(select source_type, hive_type from
hivemeta.types) as gpHiveDataTypes").option("user",
metaUserName).option("password", metaPassword).load()
val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL)
.option("dbtable", "(select source_columns, precision_columns,
partition_columns from hivemeta.source_table where
tablename='gpschema.empdocs') as colsPrecision") .option("user",
metaUserName).option("password", metaPassword).load()
val dataMapper = dtypes.as[(String, String)].collect().toMap
val gpCols = spColsDF.select("source_columns").map(row =>
row.getString(0)).collect.mkString(",")
val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s =>
s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList
val precisionCols =
spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x
=> x.getAs[String](0).split(","))
val prtn_String_columns =
spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
val partCList = prtn_String_columns.split(",").map(x => col(x))
var splitPrecisionCols = precisionCols.split(",") for (i <- splitPrecisionCols)
{ precisionColsText += i.concat(s"::${textType} as ").concat(s"${i}_text")
textList += s"${i}_text:${textType}" }
val pCols = precisionColsText.mkString(",")
val allColumns = gpColumns.concat("," + pCols)
val allColumnsSeq = allColumns.split(",").toSeq
val allColumnsSeqC = allColumnsSeq.map(x => column(x))
val gpColSeq = gpColumns.split(",").toSeq
def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String],
allColumns: String, dataMapper: Map[String, String], partition_columns:
Array[String], spark: SparkSession): DataFrame = {
val yearDF =
spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url",
connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema")
.option("user", devUserName).option("password", devPassword)
.option("partitionColumn","header_id") .load() .where("year=2017 and month=12")
.select(gpColSeq map col:_*) .withColumn(flagCol, lit(0))
val totalCols: List[String] = splitColumns ++ textList
val allColsOrdered = yearDF.columns.diff(partition_columns) ++
partition_columns val allCols = allColsOrdered.map(colname =>
org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols: _*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType ==
StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) { (tempDF, colName) =>
tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName),
"[\r\n]+", " "), "[\t]+", " ")) } finalDF }
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper,
partition_columns, spark)
dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/") }
}{code}
When I submit the job, I see the tasks at below lines complete:
{code:java}
val dataMapper = dtypes.as[(String, String)].collect().toMap
val gpCols = spColsDF.select("source_columns").map(row =>
row.getString(0)).collect.mkString(",")
val precisionCols =
spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x
=> x.getAs[String](0).split(","))
val prtn_String_columns =
spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
{code}
Once the task of saving the prepared dataframe starts, which is:
{noformat}
dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/"){noformat}
job ends with the exception: \{{}}
{noformat}
java.util.NoSuchElementException{noformat}
I am submitting the job using below spark-submit command:
{code:java}
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition
--master=yarn --conf spark.ui.port=4090 --driver-class-path
/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --conf
spark.jars=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --executor-cores
3 --executor-memory 13G --keytab /home/hdpdevusr/hdpdevusr.keytab --principal
[email protected] --files
/usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties
--name Splinter --conf
spark.executor.extraClassPath=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar
splinter_2.11-0.1.jar{code}
I see the command launches the executors as per the specified numbers in the
code which is 12 executors with 4 cores each.
Only 5 out of 48 tasks will complete and the job ends with the exception:
{code:java}
[Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0
in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at
io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43) at
io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110)
at
io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109)
at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by:
org.apache.spark.SparkException: Job 5 cancelled because killed via the Web UI
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at
org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1446)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at
org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1439)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1701)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 44 more 18/12/27 10:30:53 WARN ShutdownHookManager: ShutdownHook '$anon$2'
timeout, java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException at
java.util.concurrent.FutureTask.get(FutureTask.java:205) at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)
18/12/27 10:30:53 ERROR Utils: Uncaught exception in thread pool-6-thread-1
java.lang.InterruptedException at java.lang.Object.wait(Native Method) at
java.lang.Thread.join(Thread.java:1249) at
java.lang.Thread.join(Thread.java:1323) at
org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:199) at
org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1919)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317) at
org.apache.spark.SparkContext.stop(SparkContext.scala:1918) at
org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948) at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192) at
org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at
org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745){code}
I don't understand where did it go wrong whether in code or in any
configuration applied in the job.
I posted the same on Stackoverflow as well. For executor images, the below link
can be referred:[
[https://stackoverflow.com/questions/54002002/how-to-fix-java-util-nosuchelementexception-while-saving-data-into-hdfs-using-sp/54002423?noredirect=1#comment94843141_54002423]|http://example.com]
Could anyone let me know how to fix this exception ?
> java.util.NoSuchElementException while saving data into HDFS using Spark
> ------------------------------------------------------------------------
>
> Key: SPARK-26558
> URL: https://issues.apache.org/jira/browse/SPARK-26558
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Spark Submit
> Affects Versions: 2.0.0
> Reporter: Sidhartha
> Priority: Major
> Attachments: OKVMg.png, k5EWv.png
>
>
> h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while
> saving data into HDFS using Spark ?
>
> I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.
> Below are the versions of Spark & Scala I am using:
> spark-core: 2.0.0
> spark-sql: 2.0.0
> Scala version: 2.11.8
> To do that, I wrote the following code:
>
> {code:java}
> val conf = new
> SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval",
> "1200s") .set("spark.network.timeout", "12000s")
> .set("spark.sql.inMemoryColumnarStorage.compressed", "true")
> .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress",
> "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer",
> classOf[org.apache.spark.serializer.KryoSerializer].getName)
> .set("spark.streaming.stopGracefullyOnShutdown", "true")
> .set("spark.yarn.driver.memoryOverhead", "8192")
> .set("spark.yarn.executor.memoryOverhead", "8192")
> .set("spark.sql.shuffle.partitions", "400")
> .set("spark.dynamicAllocation.enabled", "false")
> .set("spark.shuffle.service.enabled", "true")
> .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances",
> "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4")
> .set("spark.files.maxPartitionBytes", "268435468")
> val flagCol = "del_flag" val spark =
> SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition",
> "true").config("hive.exec.dynamic.partition.mode",
> "nonstrict").getOrCreate() import spark.implicits._
> val dtypes = spark.read.format("jdbc").option("url",
> hiveMetaConURL).option("dbtable", "(select source_type, hive_type from
> hivemeta.types) as gpHiveDataTypes").option("user",
> metaUserName).option("password", metaPassword).load()
> val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL)
> .option("dbtable", "(select source_columns, precision_columns,
> partition_columns from hivemeta.source_table where
> tablename='gpschema.empdocs') as colsPrecision") .option("user",
> metaUserName).option("password", metaPassword).load()
> val dataMapper = dtypes.as[(String, String)].collect().toMap
> val gpCols = spColsDF.select("source_columns").map(row =>
> row.getString(0)).collect.mkString(",")
> val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s =>
> s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList
> val precisionCols =
> spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
> val partition_columns =
> spColsDF.select("partition_columns").collect.flatMap(x =>
> x.getAs[String](0).split(","))
> val prtn_String_columns =
> spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
> val partCList = prtn_String_columns.split(",").map(x => col(x))
> var splitPrecisionCols = precisionCols.split(",") for (i <-
> splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as
> ").concat(s"${i}_text") textList += s"${i}_text:${textType}" }
> val pCols = precisionColsText.mkString(",")
> val allColumns = gpColumns.concat("," + pCols)
> val allColumnsSeq = allColumns.split(",").toSeq
> val allColumnsSeqC = allColumnsSeq.map(x => column(x))
> val gpColSeq = gpColumns.split(",").toSeq
> def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String],
> allColumns: String, dataMapper: Map[String, String], partition_columns:
> Array[String], spark: SparkSession): DataFrame = {
> val yearDF =
> spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url",
> connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema")
> .option("user", devUserName).option("password", devPassword)
> .option("partitionColumn","header_id") .load() .where("year=2017 and
> month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0))
> val totalCols: List[String] = splitColumns ++ textList
> val allColsOrdered = yearDF.columns.diff(partition_columns) ++
> partition_columns val allCols = allColsOrdered.map(colname =>
> org.apache.spark.sql.functions.col(colname))
> val resultDF = yearDF.select(allCols: _*)
> val stringColumns = resultDF.schema.fields.filter(x => x.dataType ==
> StringType).map(s => s.name)
> val finalDF = stringColumns.foldLeft(resultDF) { (tempDF, colName) =>
> tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName),
> "[\r\n]+", " "), "[\t]+", " ")) } finalDF }
> val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper,
> partition_columns, spark)
> dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/")
> } }{code}
>
> When I submit the job, I see the tasks at below lines complete:
> {code:java}
>
> val dataMapper = dtypes.as[(String, String)].collect().toMap
> val gpCols = spColsDF.select("source_columns").map(row =>
> row.getString(0)).collect.mkString(",")
> val precisionCols =
> spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
> val partition_columns =
> spColsDF.select("partition_columns").collect.flatMap(x =>
> x.getAs[String](0).split(","))
> val prtn_String_columns =
> spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
>
> {code}
>
> Once the task of saving the prepared dataframe starts, which is:
> {noformat}
> dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/"){noformat}
> job ends with the exception: \{{}}
> {noformat}
> java.util.NoSuchElementException{noformat}
> I am submitting the job using below spark-submit command:
> {code:java}
> SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition
> --master=yarn --conf spark.ui.port=4090 --driver-class-path
> /home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --conf
> spark.jars=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar
> --executor-cores 3 --executor-memory 13G --keytab
> /home/hdpdevusr/hdpdevusr.keytab --principal [email protected] --files
> /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties
> --name Splinter --conf
> spark.executor.extraClassPath=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar
> splinter_2.11-0.1.jar{code}
> I see the command launches the executors as per the specified numbers in the
> code which is 12 executors with 4 cores each.
> Only 5 out of 48 tasks will complete and the job ends with the exception:
> {code:java}
> [Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0
> in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException:
> None.get at scala.None$.get(Option.scala:347) at
> scala.None$.get(Option.scala:345) at
> io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43) at
> io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110)
> at
> io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109)
> at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
> org.apache.spark.scheduler.Task.run(Task.scala:108) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745) Caused by:
> org.apache.spark.SparkException: Job 5 cancelled because killed via the Web
> UI at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1446)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at
> org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1439)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1701)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
> ... 44 more 18/12/27 10:30:53 WARN ShutdownHookManager: ShutdownHook
> '$anon$2' timeout, java.util.concurrent.TimeoutException
> java.util.concurrent.TimeoutException at
> java.util.concurrent.FutureTask.get(FutureTask.java:205) at
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)
> 18/12/27 10:30:53 ERROR Utils: Uncaught exception in thread pool-6-thread-1
> java.lang.InterruptedException at java.lang.Object.wait(Native Method) at
> java.lang.Thread.join(Thread.java:1249) at
> java.lang.Thread.join(Thread.java:1323) at
> org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:199) at
> org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1919)
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317) at
> org.apache.spark.SparkContext.stop(SparkContext.scala:1918) at
> org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
> at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948) at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
> at scala.util.Try$.apply(Try.scala:192) at
> org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745){code}
>
> I don't understand where did it go wrong whether in code or in any
> configuration applied in the job.
> I posted the same on Stackoverflow as well. For executor images, the below
> link can be referred:[
>
> [https://stackoverflow.com/questions/54002002/how-to-fix-java-util-nosuchelementexception-while-saving-data-into-hdfs-using-sp/54002423?noredirect=1#comment94843141_54002423]|http://example.com]
> Could anyone let me know how to fix this exception ?
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]