[
https://issues.apache.org/jira/browse/SPARK-20158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949643#comment-15949643
]
Paride Casulli commented on SPARK-20158:
----------------------------------------
Hi,
our code uses spark streaming for copying rows from uncompressed (and
unpartitioned) CSV files on compressed (and partitioned) hive tables stored as
parquet file, the program is running on a yarn cluster (client mode) with
dynamic allocation of executors enabled. We have installed Spark 2.1 aside a
BigInsights 4.1 Hadoop distribution in order to test performance and avoid this
bug impacting on our program: https://issues.apache.org/jira/browse/SPARK-9591
The problem arises on heavy workload (when reading/writing a lots of big files
on HDFS causing spark-streaming delay, over the streaming duration) while seems
OK during normal/low workload. The problem, if triggered, remains also if I
block the copy of new input files...
The impacted tables (2 tables in total) have about 200 and 300 fields, the
input rate is about 1.6 GB/minute (for each table) of uncompressed CSVs.
The problem compares simultaneously on all the 4 jobs we have running on our
cluster (submitted with 3 spark-submit commands), this is a snippet, the code
has been converted on-the fly from the Spark 1.x version, sqlContext is an
instance of SparkSession type:
JavaDStream<String[]> streamRes =
textFileStream.map((String s) -> s.split(","));
boolean loop = true;
do{
try{
streamRes.map(rackingMain::getRow).foreachRDD(rowJavaRDD -> {
if (!rowJavaRDD.isEmpty()) {
Dataset<Row> dataFrame =
sqlContext.createDataFrame(rowJavaRDD, type);
LOG.info("BEFORE CALLBACK");
rackingMain.callbackProcess(dataFrame, sqlContext, tempTable, rackingQuery);
//TODO re-enable
insertLogFile(dataFrame);
}
});
loop = false;
}catch(Exception e){
LOG.error(e.getMessage());
LOG.error("Faulty step (error in external
block), re-ingest...");
}
}while(loop==true);
LOG.info("Streaming context started");
ssc.start();
ssc.awaitTermination();
....
....
....
default public void callbackProcess(Dataset<Row> dataFrame,
SparkSession sqlContext, String tempTable, String rackingQuery,boolean
deleteTemp) {
boolean loop=true;
do{
try{
System.out.println("generic callback on
"+tempTable);
//dataFrame = dataFrame.coalesce(9);
dataFrame.createOrReplaceTempView(tempTable);
System.out.println("Racking query:
"+rackingQuery);
sqlContext.sql(rackingQuery);
loop = false;
}catch(Exception e){
this.getLogger().error(e.getMessage());
this.getLogger().error("Faulty step (error in
internal block), re-ingest...");
}
}while(loop==true);
> crash in Spark sql insert in partitioned hive tables
> ----------------------------------------------------
>
> Key: SPARK-20158
> URL: https://issues.apache.org/jira/browse/SPARK-20158
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0, 2.1.0
> Environment: hive 1.2.1 on parquet file table
> Reporter: Paride Casulli
>
> Hi, I have this exception while inserting data on a parquet partitioned table
> in hive from a temp view
> Job aborted due to stage failure: Task 3 in stage 177.0 failed 4 times, most
> recent failure: Lost task 3.3 in stage 177.0 (TID 3833, XXX.XXX.XXX.XXX,
> executor 1008): org.apache.spark.SparkException: Task failed while writing
> rows.
> at
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]