[ 
https://issues.apache.org/jira/browse/SPARK-15673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Hutton updated SPARK-15673:
---------------------------------
    Description: 
I have raised a couple of bugs to do with spark hanging. One of the previous 
ones (https://issues.apache.org/jira/browse/SPARK-15000) has been resolved in 
1.6.1 but the following example is still an issue in 1.6.1. 

The code below is a self-contained test case which generates some data and will 
lead to the hanging behaviour when run in spark-submit in 1.6.0 or 1.6.1. 
Strangely the code also hangs in spark-shell in 1.6.0 but it doesnt seem to in 
1.6.1 (hence providing the main method test below). I run this using:

spark-submit --class HangingTest --master local <path-to-compiled-jar>

The hanging doesnt occur if you remove either of the first two cache steps OR 
the sort steps (I have added comments to this affect below). We have hit quite 
a few indefinite hanging issues with spark (another is this: 
https://issues.apache.org/jira/browse/SPARK-15002). There seems to be a rather 
fundamental issue with chaining steps together and using the cache call. 

The bug seems to be confined to reading data out of hadoop - if we put the data 
onto a local drive (using file://) then the hanging stops happening. 

This may seem rather a convoluted test case but that is mainly because I have 
stripped the code back to the simplest possible code that causes the issue.

*CODE BELOW*


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions.desc

object HangingTest {
  def main(args: Array[String]): Unit = {    
   
    val conf =   new SparkConf()
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    /*Generate some data*/
    val r = scala.util.Random
    val list = (0L to 500L).map(i=>(i,r.nextInt(500).asInstanceOf[Long]))
    val distData = sc.parallelize(list)
    import sqlContext.implicits._
    val df=distData.toDF("var1","var2")
    
df.write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test1.parquet")
    
df.select("var2").write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test2.parquet")

    val df1 = sqlContext.read.load("/tmp/df_hanging_test1.parquet")

    /*Removing this step stops the hanging*/
    df1.cache
    
    /*Removing the sort part of this step stops the hanging*/
    val freq=df1.select("var1").groupBy("var1").agg(count("var1") as 
"var1_cnt").sort(desc("var1_cnt"))

    /*Removing this step stops the hanging*/
    freq.cache
        
    val df2 = sqlContext.read.load("/tmp/df_hanging_test2.parquet")
    
    val var2=df1.select("var2").unionAll(df2.select("var2"))
    
    /*Removing the sort part of this step stops the hanging*/
    val freq2=var2.select("var2").groupBy("var2").agg(count("var2") as 
"var2_cnt").sort(desc("var2_cnt"))

    /*this cache step hangs indefinitely*/
    freq2.cache
    
    /*the .show never happens - it gets stuck on the .cache above*/
    freq2.show
  }
}

  was:
I have raised a couple of bugs to do with spark hanging. One of the previous 
ones (https://issues.apache.org/jira/browse/SPARK-15000) has been resolved in 
1.6.1 but the following example is still an issue in 1.6.1. 

The code below is a self-contained test case which generates some data and will 
lead to the hanging behaviour when run in spark-submit in 1.6.0 or 1.6.1. 
Strangely the code also hangs in spark-shell in 1.6.0 but it doesnt seem to in 
1.6.1 (hence providing the main method test below). I run this using:

spark-submit --class HangingTest --master local <path-to-compiled-jar>

The hanging doesnt occur if you remove either of the first two cache steps OR 
the sort steps (I have added comments to this affect below). We have hit quite 
a few indefinite hanging issues with spark (another is this: 
https://issues.apache.org/jira/browse/SPARK-15002). There seems to be a rather 
fundamental issue with chaining steps together and using the cache call. 

The bug seems to be confined to reading data out of hadoop - if we put the data 
onto a local drive (using file://) then the hanging stops happening. 

This may seem rather a convoluted test case but that is mainly because I have 
stripped the code back to the simplest possible code that causes the issue.

*CODE BELOW*


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions.desc

object HangingTest {
  def main(args: Array[String]): Unit = {    
    
    val conf =   new SparkConf()

    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    /*Generate some data*/
    val r = scala.util.Random
    val list = (0L to 500L).map(i=>(i,r.nextInt(500).asInstanceOf[Long]))
    val distData = sc.parallelize(list)
    import sqlContext.implicits._
    val df=distData.toDF("var1","var2")
    
df.write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test1.parquet")
    
df.select("var2").write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test2.parquet")
    
    
    
    val df1 = sqlContext.read.load("/tmp/df_hanging_test1.parquet")
    /*Removing this step stops the hanging*/
    df1.cache
    
    /*Removing the sort part of this step stops the hanging*/
    val freq=df1.select("var1").groupBy("var1").agg(count("var1") as 
"var1_cnt").sort(desc("var1_cnt"))
    /*Removing this step stops the hanging*/
    freq.cache
        
    val df2 = sqlContext.read.load("/tmp/df_hanging_test2.parquet")
    
    val var2=df1.select("var2").unionAll(df2.select("var2"))
    
    /*Removing the sort part of this step stops the hanging*/
    val freq2=var2.select("var2").groupBy("var2").agg(count("var2") as 
"var2_cnt").sort(desc("var2_cnt"))

    /*this cache step hangs indefinitely*/
    freq2.cache
    
    /*the .show never happens - it gets stuck on the .cache above*/
    freq2.show

  }

}


> Indefinite hanging issue with combination of cache, sort and unionAll
> ---------------------------------------------------------------------
>
>                 Key: SPARK-15673
>                 URL: https://issues.apache.org/jira/browse/SPARK-15673
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0, 1.6.1
>         Environment: I am running the test code on both a hortonworks sandbox 
> and also on AWS EMR / EC2. 
>            Reporter: Jamie Hutton
>
> I have raised a couple of bugs to do with spark hanging. One of the previous 
> ones (https://issues.apache.org/jira/browse/SPARK-15000) has been resolved in 
> 1.6.1 but the following example is still an issue in 1.6.1. 
> The code below is a self-contained test case which generates some data and 
> will lead to the hanging behaviour when run in spark-submit in 1.6.0 or 
> 1.6.1. Strangely the code also hangs in spark-shell in 1.6.0 but it doesnt 
> seem to in 1.6.1 (hence providing the main method test below). I run this 
> using:
> spark-submit --class HangingTest --master local <path-to-compiled-jar>
> The hanging doesnt occur if you remove either of the first two cache steps OR 
> the sort steps (I have added comments to this affect below). We have hit 
> quite a few indefinite hanging issues with spark (another is this: 
> https://issues.apache.org/jira/browse/SPARK-15002). There seems to be a 
> rather fundamental issue with chaining steps together and using the cache 
> call. 
> The bug seems to be confined to reading data out of hadoop - if we put the 
> data onto a local drive (using file://) then the hanging stops happening. 
> This may seem rather a convoluted test case but that is mainly because I have 
> stripped the code back to the simplest possible code that causes the issue.
> *CODE BELOW*
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.functions.count
> import org.apache.spark.sql.functions.desc
> object HangingTest {
>   def main(args: Array[String]): Unit = {    
>    
>     val conf =   new SparkConf()
>     val sc = new SparkContext(conf)
>     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>     
>     /*Generate some data*/
>     val r = scala.util.Random
>     val list = (0L to 500L).map(i=>(i,r.nextInt(500).asInstanceOf[Long]))
>     val distData = sc.parallelize(list)
>     import sqlContext.implicits._
>     val df=distData.toDF("var1","var2")
>     
> df.write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test1.parquet")
>     
> df.select("var2").write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test2.parquet")
>     val df1 = sqlContext.read.load("/tmp/df_hanging_test1.parquet")
>     /*Removing this step stops the hanging*/
>     df1.cache
>     
>     /*Removing the sort part of this step stops the hanging*/
>     val freq=df1.select("var1").groupBy("var1").agg(count("var1") as 
> "var1_cnt").sort(desc("var1_cnt"))
>     /*Removing this step stops the hanging*/
>     freq.cache
>         
>     val df2 = sqlContext.read.load("/tmp/df_hanging_test2.parquet")
>     
>     val var2=df1.select("var2").unionAll(df2.select("var2"))
>     
>     /*Removing the sort part of this step stops the hanging*/
>     val freq2=var2.select("var2").groupBy("var2").agg(count("var2") as 
> "var2_cnt").sort(desc("var2_cnt"))
>     /*this cache step hangs indefinitely*/
>     freq2.cache
>     
>     /*the .show never happens - it gets stuck on the .cache above*/
>     freq2.show
>   }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to