[ 
https://issues.apache.org/jira/browse/SPARK-22558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259472#comment-16259472
 ] 

KhajaAsmath Mohammed commented on SPARK-22558:
----------------------------------------------

any suggestions for this issue? 

> SparkHiveDynamicPartition fails when trying to write data from kafka to hive 
> using spark streaming
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22558
>                 URL: https://issues.apache.org/jira/browse/SPARK-22558
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Spark Shell, Spark Submit
>    Affects Versions: 2.1.1
>            Reporter: KhajaAsmath Mohammed
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> I am able to write data from kafka into hive table using spark streaming. 
> Batches run successfully for one day and after some successful runs I get 
> below errors. Is there a way to resolve it.
> It is dynamic hive partitoon 
> Job aborted due to stage failure: Task 0 in stage 381.0 failed 4 times, most 
> recent failure: Lost task 0.3 in stage 381.0 (TID 129383, 
> brksvl255.brk.navistar.com, executor 1): org.apache.spark.SparkException: 
> Task failed while writing rows.
>       at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:99)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at 
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
>       at 
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
>       at 
> parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
>       at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.close(ParquetRecordWriterWrapper.java:102)
>       at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.close(ParquetRecordWriterWrapper.java:119)
>       at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:320)
>       ... 8 more
> I am sure there is some problem with dynamic partion. Here is query executed 
> inside dstream.
> insert into bonalab.datapoint_location partition(year,month) 
> select vin,utctime,description,descriptionuom,providerdesc,
> islocation,latitude,longitude,speed,value, current_timestamp as 
> processed_date,
> 1 as version, 
> year,month from 
> bonalab.datapoint_location where
> year=2017 
> and month=10 
> group by year,month,vin,utctime,description,descriptionuom,providerdesc,
> islocation,latitude,longitude,speed,value,processed_date limit 15
>  val datapointDF = datapointDStream.foreachRDD { rdd =>
> if (!runMode.equalsIgnoreCase("local")) {
>         sparkSession.sql(s"set hive.exec.dynamic.partition.mode=nonstrict")
>         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
>         sparkSession.sql(s"set hive.exec.dynamic.partition = true")
>       }
> if (!rdd.isEmpty) {
>         /*   val sparkSession = 
> SparkSession.builder.enableHiveSupport.getOrCreate
>         import sparkSession.implicits._*/
>         val datapointDstreamDF = rdd.toDS
>         //println("DataPoint data")
>         //datapointDstreamDF.show(1)
>         datapointDstreamDF.createOrReplaceTempView("datapoint_tmp")
> sparkSession.sql(HiveDAO.Geofences.insertLocationDataPoints("datapoint_tmp",hiveDBInstance))
> }
> }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to