[ 
https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-21721:
----------------------------
    Priority: Critical  (was: Major)

> Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
> ----------------------------------------------------------------------
>
>                 Key: SPARK-21721
>                 URL: https://issues.apache.org/jira/browse/SPARK-21721
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2, 2.1.1, 2.2.0
>            Reporter: yzheng616
>            Priority: Critical
>
> The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. 
> At line 118, it put a staging path to FileSystem delete cache, and then 
> remove the path from disk at line 385. It does not remove the path from 
> FileSystem cache. If a streaming application keep persisting data to a 
> partitioned hive table, the memory will keep increasing until JVM terminated.
> Below is a simple code to reproduce it.
> {code:java}
> package test
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.FileSystem
> import org.apache.spark.sql.SaveMode
> import java.lang.reflect.Field
> case class PathLeakTest(id: Int, gp: String)
> object StagePathLeak {
>   def main(args: Array[String]): Unit = {
>     val spark = 
> SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
>     spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
>     //create a partitioned table
>     spark.sql("drop table if exists path_leak");
>     spark.sql("create table if not exists path_leak(id int)" +
>         " partitioned by (gp String)"+
>       " row format serde 
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>       " stored as"+
>         " inputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
>         " outputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
>     var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
>     // 2 partitions
>     for (x <- 1 to 2) {
>       seq += (new PathLeakTest(x, "g" + x))
>     }
>     val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
>     //insert 50 records to Hive table
>     for (j <- 1 to 50) {
>       val df = spark.createDataFrame(rdd)
>       //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem 
> deleteOnExit cache
>       //#2 InsertIntoHiveTable line 385:  delete the path from disk but not 
> from the FileSystem cache, and it caused the leak
>       df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
>     }
>     
>     val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>     val deleteOnExit = getDeleteOnExit(fs.getClass)
>     deleteOnExit.setAccessible(true)
>     val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
>     //check FileSystem deleteOnExit cache size
>     println(caches.size())
>     val it = caches.iterator()
>     //all starge pathes were still cached even they have already been deleted 
> from the disk
>     while(it.hasNext()){
>       println(it.next());
>     }
>   }
>   
>   def getDeleteOnExit(cls: Class[_]) : Field = {
>     try{
>        return cls.getDeclaredField("deleteOnExit")
>     }catch{
>       case ex: NoSuchFieldException => return 
> getDeleteOnExit(cls.getSuperclass)
>     }
>     return null
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to