[
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932527#comment-15932527
]
Mark Heimann commented on SPARK-16087:
--------------------------------------
We're most likely seeing the same issue in our project using Spark 1.6.2 - the
main difference being that we're using the Dataset API instead of going
directly with RDDs.
A couple of perhaps interesting facts we have seen so far:
- Using a {{coalesce}} behind the union solves the "hanging" for us, the
partition count passed to {{coalesce}} appears to be irrelevant
- The same happens when you {{repartition}} the right-side Dataset before you
union both
- We only encounter this issue when testing locally with input data loaded from
HDFS, when the same Spark application runs on our cluster inside YARN, we have
not encountered this problem yet (which I find odd)
> Spark Hangs When Using Union With Persisted Hadoop RDD
> ------------------------------------------------------
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.4.1, 1.6.1, 2.0.1
> Reporter: Kevin Conaway
> Priority: Critical
> Attachments: part-00000, part-00001, Screen Shot 2016-06-21 at
> 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png,
> SPARK-16087.dump.log, SPARK-16087.log, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile(
> "hdfs://localhost:9000/part-00000",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>,
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2<LongWritable, BytesWritable>
> call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-00001",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-00000 and part-00001 to a local hdfs
> instance (I'm just using a dummy [Single Node
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
> locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count())
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]