[
https://issues.apache.org/jira/browse/SPARK-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13989876#comment-13989876
]
Piotr Kołaczkowski commented on SPARK-1712:
-------------------------------------------
This is log from shell:
{noformat}
scala> val rdd = sc.parallelize(collection)
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: parse(" val rdd
= sc.parallelize(collection)
") Some(List(val rdd = sc.parallelize(collection)))
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: 11: ValDef
11: TypeTree
31: Apply
20: Select
17: Ident
32: Ident
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: parse("
class $read extends Serializable {
class $iwC extends Serializable {
val $VAL2 = $line3.$read.INSTANCE;
import $VAL2.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
class $iwC extends Serializable {
import com.datastax.bdp.spark.CassandraFunctions._
class $iwC extends Serializable {
import com.datastax.bdp.spark.context.CassandraContext
class $iwC extends Serializable {
import com.tuplejump.calliope.Implicits._
class $iwC extends Serializable {
val $VAL3 = $line17.$read.INSTANCE;
import $VAL3.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`collection`;
class $iwC extends Serializable {
val rdd = sc.parallelize(collection)
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
object $read {
val INSTANCE = new $read();
}
") Some(List(class $read extends Serializable {
def <init>() = {
super.<init>();
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
val $VAL2 = $line3.$read.INSTANCE;
import $VAL2.$iw.$iw.sc;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import org.apache.spark.SparkContext._;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import com.datastax.bdp.spark.CassandraFunctions._;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import com.datastax.bdp.spark.context.CassandraContext;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import com.tuplejump.calliope.Implicits._;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
val $VAL3 = $line17.$read.INSTANCE;
import $VAL3.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
val rdd = sc.parallelize(collection)
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
}, object $read extends scala.AnyRef {
def <init>() = {
super.<init>();
()
};
val INSTANCE = new $read()
}))
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: class $read extends
Serializable {
def <init>() = {
super.<init>;
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
val $VAL2 = $line3.$read.INSTANCE;
import $VAL2.$iw.$iw.sc;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import org.apache.spark.SparkContext._;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import com.datastax.bdp.spark.CassandraFunctions._;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import com.datastax.bdp.spark.context.CassandraContext;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import com.tuplejump.calliope.Implicits._;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
val $VAL3 = $line17.$read.INSTANCE;
import $VAL3.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
val rdd = sc parallelize collection
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
}
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: object $read extends
scala.AnyRef {
def <init>() = {
super.<init>;
()
};
val INSTANCE = new $read.<init>
}
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: Set symbol of rdd to
val rdd(): org.apache.spark.rdd.RDD
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: parse("
object $eval {
lazy val $result =
$line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`rdd`
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw
(""
+ "rdd: org.apache.spark.rdd.RDD[(String, Int)] = " +
scala.runtime.ScalaRunTime.replStringOf($line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`rdd`,
1000)
)
}
}
") Some(List(object $eval extends scala.AnyRef {
def <init>() = {
super.<init>();
()
};
lazy val $result = $line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
"".$plus("rdd: org.apache.spark.rdd.RDD[(String, Int)] =
").$plus(scala.runtime.ScalaRunTime.replStringOf($line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd,
1000))
}
}))
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: object $eval extends
scala.AnyRef {
def <init>() = {
super.<init>;
()
};
lazy val $result = $line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
"".+("rdd: org.apache.spark.rdd.RDD[(String, Int)] =
").+(scala.runtime.ScalaRunTime.replStringOf($line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd,
1000))
}
}
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public
static java.lang.String $line18.$eval.$print()
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at
parallelize at <console>:21
scala> rdd.map(_._2).sum
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("
rdd.map(_._2).sum
") Some(List(rdd.map(((x$1) => x$1._2)).sum))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: 21: Select
14: Apply
11: Select
7: Ident
17: Function
15: ValDef
15: TypeTree
-1: EmptyTree
17: Select
15: Ident
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse(" val
res3 =
rdd.map(_._2).sum
") Some(List(val res3 = rdd.map(((x$1) => x$1._2)).sum))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: 11: ValDef
11: TypeTree
46: Select
39: Apply
36: Select
32: Ident
42: Function
40: ValDef
40: TypeTree
-1: EmptyTree
42: Select
40: Ident
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("
class $read extends Serializable {
class $iwC extends Serializable {
val $VAL4 = $line3.$read.INSTANCE;
import $VAL4.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
class $iwC extends Serializable {
import com.datastax.bdp.spark.CassandraFunctions._
class $iwC extends Serializable {
import com.datastax.bdp.spark.context.CassandraContext
class $iwC extends Serializable {
import com.tuplejump.calliope.Implicits._
class $iwC extends Serializable {
val $VAL5 = $line17.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`collection`;
val $VAL6 = $line18.$read.INSTANCE;
import $VAL6.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`rdd`;
class $iwC extends Serializable {
val res3 =
rdd.map(_._2).sum
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
object $read {
val INSTANCE = new $read();
}
") Some(List(class $read extends Serializable {
def <init>() = {
super.<init>();
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
val $VAL4 = $line3.$read.INSTANCE;
import $VAL4.$iw.$iw.sc;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import org.apache.spark.SparkContext._;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import com.datastax.bdp.spark.CassandraFunctions._;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import com.datastax.bdp.spark.context.CassandraContext;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
import com.tuplejump.calliope.Implicits._;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
val $VAL5 = $line17.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
val $VAL6 = $line18.$read.INSTANCE;
import $VAL6.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
class $iwC extends Serializable {
def <init>() = {
super.<init>();
()
};
val res3 = rdd.map(((x$1) => x$1._2)).sum
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
};
val $iw = new $iwC()
}, object $read extends scala.AnyRef {
def <init>() = {
super.<init>();
()
};
val INSTANCE = new $read()
}))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: class $read extends
Serializable {
def <init>() = {
super.<init>;
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
val $VAL4 = $line3.$read.INSTANCE;
import $VAL4.$iw.$iw.sc;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import org.apache.spark.SparkContext._;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import com.datastax.bdp.spark.CassandraFunctions._;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import com.datastax.bdp.spark.context.CassandraContext;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
import com.tuplejump.calliope.Implicits._;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
val $VAL5 = $line17.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
val $VAL6 = $line18.$read.INSTANCE;
import $VAL6.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
class $iwC extends Serializable {
def <init>() = {
super.<init>;
()
};
val res3 = rdd.map(((x$1) => x$1._2)).sum
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
};
val $iw = new $iwC.<init>
}
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: object $read extends
scala.AnyRef {
def <init>() = {
super.<init>;
()
};
val INSTANCE = new $read.<init>
}
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: Set symbol of res3 to
val res3(): Double
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("
object $eval {
lazy val $result =
$line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`res3`
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw
(""
+ "res3: Double = " +
scala.runtime.ScalaRunTime.replStringOf($line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`res3`,
1000)
)
}
}
") Some(List(object $eval extends scala.AnyRef {
def <init>() = {
super.<init>();
()
};
lazy val $result =
$line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3;
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
"".$plus("res3: Double =
").$plus(scala.runtime.ScalaRunTime.replStringOf($line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3,
1000))
}
}))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: object $eval extends
scala.AnyRef {
def <init>() = {
super.<init>;
()
};
lazy val $result =
$line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3;
val $print: String = {
$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
"".+("res3: Double =
").+(scala.runtime.ScalaRunTime.replStringOf($line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3,
1000))
}
}
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public
static java.lang.String $line19.$eval.$print()
14/05/05 21:23:46 INFO SharkContext: Starting job: sum at <console>:24
14/05/05 21:23:46 INFO DAGScheduler: Got job 0 (sum at <console>:24) with 2
output partitions (allowLocal=false)
14/05/05 21:23:46 INFO DAGScheduler: Final stage: Stage 0 (sum at <console>:24)
14/05/05 21:23:46 INFO DAGScheduler: Parents of final stage: List()
14/05/05 21:23:46 INFO DAGScheduler: Missing parents: List()
14/05/05 21:23:46 DEBUG DAGScheduler: submitStage(Stage 0)
14/05/05 21:23:46 DEBUG DAGScheduler: missing: List()
14/05/05 21:23:46 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
numericRDDToDoubleRDDFunctions at <console>:24), which has no missing parents
14/05/05 21:23:46 DEBUG DAGScheduler: submitMissingTasks(Stage 0)
14/05/05 21:23:46 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(MappedRDD[2] at numericRDDToDoubleRDDFunctions at <console>:24)
14/05/05 21:23:46 DEBUG DAGScheduler: New pending tasks: Set(ResultTask(0, 0),
ResultTask(0, 1))
14/05/05 21:23:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/05/05 21:23:46 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
14/05/05 21:23:46 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0:
ANY
14/05/05 21:23:46 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 0
14/05/05 21:23:46 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 0
14/05/05 21:23:46 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor
0: 127.0.0.1 (PROCESS_LOCAL)
14/05/05 21:23:47 INFO TaskSetManager: Serialized task 0.0:0 as 13890654 bytes
in 617 ms
14/05/05 21:23:47 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:47 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:48 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:48 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:49 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:49 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:50 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:50 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
14/05/05 21:23:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0,
runningTasks: 1
{noformat}
How to enable logging from Executor? It doesn't seem to log anything anywhere,
but maybe something is misconfigured.
> ParallelCollectionRDD operations hanging forever without any error messages
> ----------------------------------------------------------------------------
>
> Key: SPARK-1712
> URL: https://issues.apache.org/jira/browse/SPARK-1712
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 0.9.0
> Environment: Linux Ubuntu 14.04, a single spark node; standalone mode.
> Reporter: Piotr Kołaczkowski
> Attachments: executor.jstack.txt, master.jstack.txt, repl.jstack.txt,
> spark-hang.png, worker.jstack.txt
>
>
> {noformat}
> scala> val collection = (1 to 1000000).map(i => ("foo" + i, i)).toVector
> collection: Vector[(String, Int)] = Vector((foo1,1), (foo2,2), (foo3,3),
> (foo4,4), (foo5,5), (foo6,6), (foo7,7), (foo8,8), (foo9,9), (foo10,10),
> (foo11,11), (foo12,12), (foo13,13), (foo14,14), (foo15,15), (foo16,16),
> (foo17,17), (foo18,18), (foo19,19), (foo20,20), (foo21,21), (foo22,22),
> (foo23,23), (foo24,24), (foo25,25), (foo26,26), (foo27,27), (foo28,28),
> (foo29,29), (foo30,30), (foo31,31), (foo32,32), (foo33,33), (foo34,34),
> (foo35,35), (foo36,36), (foo37,37), (foo38,38), (foo39,39), (foo40,40),
> (foo41,41), (foo42,42), (foo43,43), (foo44,44), (foo45,45), (foo46,46),
> (foo47,47), (foo48,48), (foo49,49), (foo50,50), (foo51,51), (foo52,52),
> (foo53,53), (foo54,54), (foo55,55), (foo56,56), (foo57,57), (foo58,58),
> (foo59,59), (foo60,60), (foo61,61), (foo62,62), (foo63,63), (foo64,64),
> (foo...
> scala> val rdd = sc.parallelize(collection)
> rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at
> parallelize at <console>:24
> scala> rdd.first
> res4: (String, Int) = (foo1,1)
> scala> rdd.map(_._2).sum
> // nothing happens
> {noformat}
> CPU and I/O idle.
> Memory usage reported by JVM, after manually triggered GC:
> repl: 216 MB / 2 GB
> executor: 67 MB / 2 GB
> worker: 6 MB / 128 MB
> master: 6 MB / 128 MB
> No errors found in worker's stderr/stdout.
> It works fine with 700,000 elements and then it takes about 1 second to
> process the request and calculate the sum. With 700,000 items the spark
> executor memory doesn't even exceed 300 MB out of 2GB available. It fails
> with 800,000 items.
> Multiple parralelized collections of size 700,000 items at the same time in
> the same session work fine.
--
This message was sent by Atlassian JIRA
(v6.2#6252)