[ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650788#comment-15650788 ]
Morten Hornbech commented on SPARK-16087: ----------------------------------------- I can reproduce this issue on Spark 2.0.1 using cassandra connector. Double unions trigger a hang on collect - single unions are ok. Test below: import com.datastax.driver.core.Cluster import com.datastax.spark.connector._ import com.websudos.phantom.connectors.KeySpaceDef import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.scalatest.FunSuite case class TestData(id: String) class ReproTest extends FunSuite { test("error reproduction scenario") { // SETUP: Build key space and tables. val keySpaceName = "test" val tableA = "test_table_a" val tableB = "test_table_b" val tableC = "test_table_c" val builder = Cluster.builder().addContactPoint("127.0.0.1").withPort(9142) val keySpaceDef = new KeySpaceDef( keySpaceName, _ => builder, true, Some((ses, ks) => s"CREATE KEYSPACE IF NOT EXISTS $keySpaceName WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")) keySpaceDef.session.execute(s"DROP TABLE IF EXISTS ${keySpaceName}.${tableA}") keySpaceDef.session.execute(s"DROP TABLE IF EXISTS ${keySpaceName}.${tableB}") keySpaceDef.session.execute(s"DROP TABLE IF EXISTS ${keySpaceName}.${tableC}") // SETUP: Create spark session. val config = new SparkConf() .setMaster("local[*]") .set("spark.cassandra.connection.port", "9142") .setAppName("test") .set("spark.cassandra.connection.host", "127.0.0.1") .set("spark.sql.warehouse.dir", "file:///C:/temp") val session = SparkSession.builder().config(config).getOrCreate() // SETUP: Create and persist data. val data = List(TestData("Foo")) val frame = session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) frame.createCassandraTable(keySpaceName, tableA) frame.createCassandraTable(keySpaceName, tableB) frame.createCassandraTable(keySpaceName, tableC) frame .write .format("org.apache.spark.sql.cassandra") .mode(SaveMode.Append) .options(Map("table" -> tableA, "keyspace" -> keySpaceName)) .save() frame .write .format("org.apache.spark.sql.cassandra") .mode(SaveMode.Append) .options(Map("table" -> tableB, "keyspace" -> keySpaceName)) .save() frame .write .format("org.apache.spark.sql.cassandra") .mode(SaveMode.Append) .options(Map("table" -> tableC, "keyspace" -> keySpaceName)) .save() // TEST: Load and transform frames: val loadedA = session.sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> tableA, "keyspace" -> keySpaceName)) .load() .createOrReplaceTempView("A") val loadedB = session.sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> tableB, "keyspace" -> keySpaceName)) .load() .createOrReplaceTempView("B") val loadedC = session.sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> tableC, "keyspace" -> keySpaceName)) .load() .createOrReplaceTempView("C") val rowsOK = session.sql("select id from A union select id from B").collect() val rowsHang = session.sql("select id from A union select id from B union select id from C").collect() } } > Spark Hangs When Using Union With Persisted Hadoop RDD > ------------------------------------------------------ > > Key: SPARK-16087 > URL: https://issues.apache.org/jira/browse/SPARK-16087 > Project: Spark > Issue Type: Bug > Affects Versions: 1.4.1, 1.6.1 > Reporter: Kevin Conaway > Priority: Critical > Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot > 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, > part-00000, part-00001, spark-16087.tar.gz > > > Spark hangs when materializing a persisted RDD that was built from a Hadoop > sequence file and then union-ed with a similar RDD. > Below is a small file that exhibits the issue: > {code:java} > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.LongWritable; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.serializer.KryoSerializer; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > public class SparkBug { > public static void main(String [] args) throws Exception { > JavaSparkContext sc = new JavaSparkContext( > new SparkConf() > .set("spark.serializer", KryoSerializer.class.getName()) > .set("spark.master", "local[*]") > .setAppName(SparkBug.class.getName()) > ); > JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile( > "hdfs://localhost:9000/part-00000", > LongWritable.class, > BytesWritable.class > ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, > LongWritable, BytesWritable>() { > @Override > public Tuple2<LongWritable, BytesWritable> > call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception { > return new Tuple2<>( > new LongWritable(tuple._1.get()), > new BytesWritable(tuple._2.copyBytes()) > ); > } > }).persist( > StorageLevel.MEMORY_ONLY() > ); > System.out.println("Before union: " + rdd1.count()); > JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile( > "hdfs://localhost:9000/part-00001", > LongWritable.class, > BytesWritable.class > ); > JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2); > System.out.println("After union: " + joined.count()); > } > } > {code} > You'll need to upload the attached part-00000 and part-00001 to a local hdfs > instance (I'm just using a dummy [Single Node > Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html] > locally). > Some things to note: > - It does not hang if rdd1 is not persisted > - It does not hang is rdd1 is not materialized (via calling rdd1.count()) > before the union-ed RDD is materialized > - It does not hang if the mapToPair() transformation is removed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org