[
https://issues.apache.org/jira/browse/SPARK-15673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jamie Hutton updated SPARK-15673:
---------------------------------
Description:
I have raised a couple of bugs to do with spark hanging. One of the previous
ones (https://issues.apache.org/jira/browse/SPARK-15000) has been resolved in
1.6.1 but the following example is still an issue in 1.6.1.
The code below is a self-contained test case which generates some data and will
lead to the hanging behaviour when run in spark-submit in 1.6.0 or 1.6.1.
Strangely the code also hangs in spark-shell in 1.6.0 but it doesnt seem to in
1.6.1 (hence providing the main method test below). I run this using:
spark-submit --class HangingTest --master local <path-to-compiled-jar>
The hanging doesnt occur if you remove either of the first two cache steps OR
the sort steps (I have added comments to this affect below). We have hit quite
a few indefinite hanging issues with spark (another is this:
https://issues.apache.org/jira/browse/SPARK-15002). There seems to be a rather
fundamental issue with chaining steps together and using the cache call.
The bug seems to be confined to reading data out of hadoop - if we put the data
onto a local drive (using file://) then the hanging stops happening.
This may seem rather a convoluted test case but that is mainly because I have
stripped the code back to the simplest possible code that causes the issue.
*CODE BELOW*
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions.desc
object HangingTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
/*Generate some data*/
val r = scala.util.Random
val list = (0L to 500L).map(i=>(i,r.nextInt(500).asInstanceOf[Long]))
val distData = sc.parallelize(list)
import sqlContext.implicits._
val df=distData.toDF("var1","var2")
df.write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test1.parquet")
df.select("var2").write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test2.parquet")
val df1 = sqlContext.read.load("/tmp/df_hanging_test1.parquet")
/*Removing this step stops the hanging*/
df1.cache
/*Removing the sort part of this step stops the hanging*/
val freq=df1.select("var1").groupBy("var1").agg(count("var1") as
"var1_cnt").sort(desc("var1_cnt"))
/*Removing this step stops the hanging*/
freq.cache
val df2 = sqlContext.read.load("/tmp/df_hanging_test2.parquet")
val var2=df1.select("var2").unionAll(df2.select("var2"))
/*Removing the sort part of this step stops the hanging*/
val freq2=var2.select("var2").groupBy("var2").agg(count("var2") as
"var2_cnt").sort(desc("var2_cnt"))
/*this cache step hangs indefinitely*/
freq2.cache
/*the .show never happens - it gets stuck on the .cache above*/
freq2.show
}
}
was:
I have raised a couple of bugs to do with spark hanging. One of the previous
ones (https://issues.apache.org/jira/browse/SPARK-15000) has been resolved in
1.6.1 but the following example is still an issue in 1.6.1.
The code below is a self-contained test case which generates some data and will
lead to the hanging behaviour when run in spark-submit in 1.6.0 or 1.6.1.
Strangely the code also hangs in spark-shell in 1.6.0 but it doesnt seem to in
1.6.1 (hence providing the main method test below). I run this using:
spark-submit --class HangingTest --master local <path-to-compiled-jar>
The hanging doesnt occur if you remove either of the first two cache steps OR
the sort steps (I have added comments to this affect below). We have hit quite
a few indefinite hanging issues with spark (another is this:
https://issues.apache.org/jira/browse/SPARK-15002). There seems to be a rather
fundamental issue with chaining steps together and using the cache call.
The bug seems to be confined to reading data out of hadoop - if we put the data
onto a local drive (using file://) then the hanging stops happening.
This may seem rather a convoluted test case but that is mainly because I have
stripped the code back to the simplest possible code that causes the issue.
*CODE BELOW*
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions.desc
object HangingTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
/*Generate some data*/
val r = scala.util.Random
val list = (0L to 500L).map(i=>(i,r.nextInt(500).asInstanceOf[Long]))
val distData = sc.parallelize(list)
import sqlContext.implicits._
val df=distData.toDF("var1","var2")
df.write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test1.parquet")
df.select("var2").write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test2.parquet")
val df1 = sqlContext.read.load("/tmp/df_hanging_test1.parquet")
/*Removing this step stops the hanging*/
df1.cache
/*Removing the sort part of this step stops the hanging*/
val freq=df1.select("var1").groupBy("var1").agg(count("var1") as
"var1_cnt").sort(desc("var1_cnt"))
/*Removing this step stops the hanging*/
freq.cache
val df2 = sqlContext.read.load("/tmp/df_hanging_test2.parquet")
val var2=df1.select("var2").unionAll(df2.select("var2"))
/*Removing the sort part of this step stops the hanging*/
val freq2=var2.select("var2").groupBy("var2").agg(count("var2") as
"var2_cnt").sort(desc("var2_cnt"))
/*this cache step hangs indefinitely*/
freq2.cache
/*the .show never happens - it gets stuck on the .cache above*/
freq2.show
}
}
> Indefinite hanging issue with combination of cache, sort and unionAll
> ---------------------------------------------------------------------
>
> Key: SPARK-15673
> URL: https://issues.apache.org/jira/browse/SPARK-15673
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.6.0, 1.6.1
> Environment: I am running the test code on both a hortonworks sandbox
> and also on AWS EMR / EC2.
> Reporter: Jamie Hutton
>
> I have raised a couple of bugs to do with spark hanging. One of the previous
> ones (https://issues.apache.org/jira/browse/SPARK-15000) has been resolved in
> 1.6.1 but the following example is still an issue in 1.6.1.
> The code below is a self-contained test case which generates some data and
> will lead to the hanging behaviour when run in spark-submit in 1.6.0 or
> 1.6.1. Strangely the code also hangs in spark-shell in 1.6.0 but it doesnt
> seem to in 1.6.1 (hence providing the main method test below). I run this
> using:
> spark-submit --class HangingTest --master local <path-to-compiled-jar>
> The hanging doesnt occur if you remove either of the first two cache steps OR
> the sort steps (I have added comments to this affect below). We have hit
> quite a few indefinite hanging issues with spark (another is this:
> https://issues.apache.org/jira/browse/SPARK-15002). There seems to be a
> rather fundamental issue with chaining steps together and using the cache
> call.
> The bug seems to be confined to reading data out of hadoop - if we put the
> data onto a local drive (using file://) then the hanging stops happening.
> This may seem rather a convoluted test case but that is mainly because I have
> stripped the code back to the simplest possible code that causes the issue.
> *CODE BELOW*
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.functions.count
> import org.apache.spark.sql.functions.desc
> object HangingTest {
> def main(args: Array[String]): Unit = {
>
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> /*Generate some data*/
> val r = scala.util.Random
> val list = (0L to 500L).map(i=>(i,r.nextInt(500).asInstanceOf[Long]))
> val distData = sc.parallelize(list)
> import sqlContext.implicits._
> val df=distData.toDF("var1","var2")
>
> df.write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test1.parquet")
>
> df.select("var2").write.format("parquet").mode("overwrite").save("/tmp/df_hanging_test2.parquet")
> val df1 = sqlContext.read.load("/tmp/df_hanging_test1.parquet")
> /*Removing this step stops the hanging*/
> df1.cache
>
> /*Removing the sort part of this step stops the hanging*/
> val freq=df1.select("var1").groupBy("var1").agg(count("var1") as
> "var1_cnt").sort(desc("var1_cnt"))
> /*Removing this step stops the hanging*/
> freq.cache
>
> val df2 = sqlContext.read.load("/tmp/df_hanging_test2.parquet")
>
> val var2=df1.select("var2").unionAll(df2.select("var2"))
>
> /*Removing the sort part of this step stops the hanging*/
> val freq2=var2.select("var2").groupBy("var2").agg(count("var2") as
> "var2_cnt").sort(desc("var2_cnt"))
> /*this cache step hangs indefinitely*/
> freq2.cache
>
> /*the .show never happens - it gets stuck on the .cache above*/
> freq2.show
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]