Hi, I have a 40G file which is a concatenation of multiple documents, I 
want to extract two features (title and tables) from each doc, so the 
program is like this:

-------------------------------------------------------------
val file = sc.textFile("/path/to/40G/file")
//file.cache()   //to enable or disable cache


val titles = file.map(line => (doc_key, getTitle())  // reduce 1; here I 
use text utility functions written in Java
  {
  }).reduceByKey(_ + _,1)


val tables = file.flatMap(line => {
 
  for (table <- all_tables)
        yield (doc_key, getTableTitle())  // reduce 2; here I use text 
utility functions written in Java
}).reduceByKey(_ + _,1)

titles.saveAsTextFile("titles.out")   //save_1, will trigger reduce_1
tables.saveAsTextFile("tables.out") //save_2, will trigger reduce_2
-------------------------------------------------------------

I expect that with file.cache(), (the later) reduce_2 should be faster 
since it will read from cached data. However, results repeatedly shows 
that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why 
reading from cache does not help in this case?

Stage GUI shows that, with cache, reduce_2 always has a wave of "outlier 
tasks", where the median latency is 2s but max is 1.7 min. 

Metric
Min
25th percentile
Median
75th percentile
Max
Result serialization time
0 ms
0 ms
0 ms
0 ms
1 ms
Duration
0.6 s
2 s
2 s
2 s
1.7 min

But these tasks are not with a long GC pause (26 ms as shown)

173
1210
SUCCESS
PROCESS_LOCAL
localhost
2014/06/17 17:49:43
1.7 min 
26 ms 


9.4 KB 



BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these 
lines in spark-env.sh

SPARK_WORKER_MEMORY=180g
SPARK_MEM=180g
SPARK_JAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=500 
-XX:MaxPermSize=256m"


Thanks,

Wei

---------------------------------
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Reply via email to