Hi,

(Apologies for the long mail, but it's necessary to provide sufficient
details considering the number of issues faced.)

I'm running into issues testing LogisticRegressionWithSGD a two node
cluster (each node with 24 cores and 16G available to slaves out of 24G on
the system). Here's a description of the application:

The model is being trained based on categorical features x, y, and (x,y).
The categorical features are mapped to binary features by converting each
distinct value in the category enum into a binary feature by itself (i.e
presence of that value in a record implies corresponding feature = 1, else
feature = 0. So, there'd be as many distinct features as enum values) . The
training vector is laid out as
[x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
training data has only one combination (Xk,Yk) and a label appearing in the
record. Thus, the corresponding labeledpoint sparse vector would only have
3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
(though parse) would be nearly 614000.  The number of records is about 1.33
million. The records have been coalesced into 20 partitions across two
nodes. The input data has not been cached.
(NOTE: I do realize the records & features may seem large for a two node
setup, but given the memory & cpu, and the fact that I'm willing to give up
some turnaround time, I don't see why tasks should inexplicably fail)

Additional parameters include:

spark.executor.memory = 14G
spark.default.parallelism = 1
spark.cores.max=20
spark.storage.memoryFraction=0.8 //No cache space required
(Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
either)

The model training was initialized as : new LogisticRegressionWithSGD(1,
maxIterations, 0.0, 0.05)

However, after 4 iterations of gradient descent, the entire execution
appeared to stall inexplicably. The corresponding executor details and
details of the stalled stage (number 14) are as follows:

Metric                        Min        25th     Median    75th     Max
Result serialization time    12 ms    13 ms    14 ms    16 ms    18 ms
Duration                    4 s        4 s        5 s        5 s        5 s
Time spent fetching task     0 ms    0 ms    0 ms    0 ms    0 ms
results
Scheduler delay                6 s        6 s        6 s        6 s
12 s


Stage Id
14 aggregate at GradientDescent.scala:178

Task Index    Task ID    Status    Locality Level     Executor
            Launch Time                Duration    GC     Result Ser
Time    Errors

                            Time

0     600     RUNNING     PROCESS_LOCAL     serious.dataone.foo.bar.com
2014/06/17 10:32:27     1.1 h
1     601     RUNNING     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27         1.1 h
2     602     RUNNING     PROCESS_LOCAL     serious.dataone.foo.bar.com
2014/06/17 10:32:27     1.1 h
3     603     RUNNING     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27         1.1 h
4     604     RUNNING     PROCESS_LOCAL     serious.dataone.foo.bar.com
2014/06/17 10:32:27     1.1 h
5     605     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s     2 s     12 ms
6     606     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
2014/06/17 10:32:27     4 s     1 s     14 ms
7     607     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s     2 s     12 ms
8     608     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
2014/06/17 10:32:27     5 s     1 s     15 ms
9     609     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s     1 s     14 ms
10     610     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
    2014/06/17 10:32:27     5 s     1 s     15 ms
11     611     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s     1 s     13 ms
12     612     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
    2014/06/17 10:32:27     5 s     1 s     18 ms
13     613     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s     1 s     13 ms
14     614     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
    2014/06/17 10:32:27     4 s     1 s     14 ms
15     615     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s     1 s     12 ms
16     616     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
    2014/06/17 10:32:27     5 s     1 s     15 ms
17     617     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s     1 s     18 ms
18     618     SUCCESS     PROCESS_LOCAL     serious.dataone.foo.bar.com
    2014/06/17 10:32:27     5 s     1 s     16 ms
19     619     SUCCESS     PROCESS_LOCAL     casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s     1 s     18 ms

Executor stats:

RDD Blocks    Memory Used    Disk Used    Active Tasks    Failed Tasks
Complete Tasks    Total Tasks    Task Time    Shuffle Read    Shuffle Write
0     0.0 B / 6.7 GB         0.0 B         2                 0
        307         309         23.2 m         0.0 B             0.0 B
0     0.0 B / 6.7 GB         0.0 B         3                 0
        308         311         22.4 m         0.0 B             0.0 B


Executor jmap output:

Server compiler detected.
JVM version is 24.55-b03

using thread-local object allocation.
Parallel GC with 18 thread(s)

Heap Configuration:
   MinHeapFreeRatio = 40
   MaxHeapFreeRatio = 70
   MaxHeapSize      = 10737418240 (10240.0MB)
   NewSize          = 1310720 (1.25MB)
   MaxNewSize       = 17592186044415 MB
   OldSize          = 5439488 (5.1875MB)
   NewRatio         = 2
   SurvivorRatio    = 8
   PermSize         = 21757952 (20.75MB)
   MaxPermSize      = 134217728 (128.0MB)
   G1HeapRegionSize = 0 (0.0MB)

Heap Usage:
PS Young Generation
Eden Space:
   capacity = 2783969280 (2655.0MB)
   used     = 192583816 (183.66223907470703MB)
   free     = 2591385464 (2471.337760925293MB)
   6.917598458557704% used
>From Space:
   capacity = 409993216 (391.0MB)
   used     = 1179808 (1.125152587890625MB)
   free     = 408813408 (389.8748474121094MB)
   0.2877628102022059% used
To Space:
   capacity = 385351680 (367.5MB)
   used     = 0 (0.0MB)
   free     = 385351680 (367.5MB)
   0.0% used
PS Old Generation
   capacity = 7158628352 (6827.0MB)
   used     = 4455093024 (4248.707794189453MB)
   free     = 2703535328 (2578.292205810547MB)
   62.2338918146983% used
PS Perm Generation
   capacity = 90701824 (86.5MB)
   used     = 45348832 (43.248016357421875MB)
   free     = 45352992 (43.251983642578125MB)
   49.99770677158598% used

8432 interned Strings occupying 714672 bytes.


Executor GC log snippet:

168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)]
9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13 sys=0.39,
real=0.32 secs]
169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] [ParOldGen:
6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) [PSPermGen:
44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22 sys=0.18,
real=4.55 secs]
174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)]
3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66 sys=0.00,
real=0.09 secs]

I tried to map partitions to cores on the nodes. Increasing the number of
partitions (say to 80 or 100) would result in progress till the 6th
iteration or so, but the next stage would stall as before with apparent
root cause / logs. With increased partitions, the last stage that completed
had the following task times:

Metric                        Min        25th     Median    75th    Max
Result serialization time    11 ms    12 ms    13 ms    15 ms    0.4 s
Duration                    0.5 s    0.9 s    1 s        3 s        7 s
Time spent fetching            0 ms    0 ms    0 ms    0 ms    0 ms
task results
Scheduler delay                5 s        6 s        6 s        7 s
12 s

My hypothesis is that as the coefficient array becomes less sparse (with
successive iterations), the cost of the aggregate goes up to the point that
it stalls (which I failed to explain). Reducing the batch fraction to a
very low number like 0.01 saw the iterations progress further, but the
model failed to converge in that case after a small number of iterations.


I also tried reducing the number of records by aggregating on (x,y) as the
key (i.e. using aggregations instead of training on every raw record), but
encountered by the following exception:

Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


I'd appreciate any insights/comments about what may be causing the
execution to stall.

If logs/tables appear poorly indented in the email, here's a gist with
relevant details: https://gist.github.com/reachbach/a418ab2f01b639b624c1

Thanks,
Bharath

Reply via email to