[ 
https://issues.apache.org/jira/browse/SPARK-20158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949643#comment-15949643
 ] 

Paride Casulli commented on SPARK-20158:
----------------------------------------

Hi,
our code uses spark streaming for copying rows from uncompressed (and 
unpartitioned) CSV files on compressed (and partitioned) hive tables stored as 
parquet file, the program is running on a yarn cluster (client mode) with 
dynamic allocation of executors enabled. We have installed Spark 2.1 aside a 
BigInsights 4.1 Hadoop distribution in order to test performance and avoid this 
bug impacting on our program: https://issues.apache.org/jira/browse/SPARK-9591

The problem arises on heavy workload (when reading/writing a lots of big files 
on HDFS causing spark-streaming delay, over the streaming duration) while seems 
OK during normal/low workload. The problem, if triggered, remains also if I 
block the copy of new input files...

The impacted tables (2 tables in total) have about 200 and 300 fields, the 
input rate is about 1.6 GB/minute (for each table) of uncompressed CSVs.

The problem compares simultaneously on all the 4 jobs we have running on our 
cluster (submitted with 3 spark-submit commands), this is a snippet, the code 
has been converted on-the fly from the Spark 1.x version, sqlContext is an 
instance of SparkSession type:

                JavaDStream<String[]> streamRes =       
textFileStream.map((String s) -> s.split(","));
                boolean loop    =       true;
                do{
                        try{
                                
streamRes.map(rackingMain::getRow).foreachRDD(rowJavaRDD -> {
                                        if (!rowJavaRDD.isEmpty()) {
                                                Dataset<Row> dataFrame = 
sqlContext.createDataFrame(rowJavaRDD, type);
                                                LOG.info("BEFORE CALLBACK");
                                                
rackingMain.callbackProcess(dataFrame, sqlContext, tempTable, rackingQuery);
                                                //TODO re-enable 
insertLogFile(dataFrame);
                                        }
                                });
                                loop    =       false;
                        }catch(Exception e){
                                LOG.error(e.getMessage());
                                LOG.error("Faulty step (error in external 
block), re-ingest...");
                        }
                }while(loop==true);
                LOG.info("Streaming context started");
                ssc.start();
                ssc.awaitTermination();

....
....
....

        default public void callbackProcess(Dataset<Row> dataFrame, 
SparkSession sqlContext, String tempTable, String rackingQuery,boolean 
deleteTemp) {
                boolean loop=true;
                do{
                        try{
                                System.out.println("generic callback on 
"+tempTable);
                                //dataFrame     =       dataFrame.coalesce(9);
                                dataFrame.createOrReplaceTempView(tempTable);
                                System.out.println("Racking query: 
"+rackingQuery);
                                sqlContext.sql(rackingQuery);
                                loop    =       false;  
                        }catch(Exception e){
                                this.getLogger().error(e.getMessage());
                                this.getLogger().error("Faulty step (error in 
internal block), re-ingest...");
                        }
                }while(loop==true);

> crash in Spark sql insert in partitioned hive tables
> ----------------------------------------------------
>
>                 Key: SPARK-20158
>                 URL: https://issues.apache.org/jira/browse/SPARK-20158
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0, 2.1.0
>         Environment: hive 1.2.1 on parquet file table
>            Reporter: Paride Casulli
>
> Hi, I have this exception while inserting data on a parquet partitioned table 
> in hive from a temp view
> Job aborted due to stage failure: Task 3 in stage 177.0 failed 4 times, most 
> recent failure: Lost task 3.3 in stage 177.0 (TID 3833, XXX.XXX.XXX.XXX, 
> executor 1008): org.apache.spark.SparkException: Task failed while writing 
> rows.
>       at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:99)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to