Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.)
I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records & features may seem large for a two node setup, but given the memory & cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: Metric Min 25th Median 75th Max Result serialization time 12 ms 13 ms 14 ms 16 ms 18 ms Duration 4 s 4 s 5 s 5 s 5 s Time spent fetching task 0 ms 0 ms 0 ms 0 ms 0 ms results Scheduler delay 6 s 6 s 6 s 6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task Index Task ID Status Locality Level Executor Launch Time Duration GC Result Ser Time Errors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 16 ms 19 619 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 18 ms Executor stats: RDD Blocks Memory Used Disk Used Active Tasks Failed Tasks Complete Tasks Total Tasks Task Time Shuffle Read Shuffle Write 0 0.0 B / 6.7 GB 0.0 B 2 0 307 309 23.2 m 0.0 B 0.0 B 0 0.0 B / 6.7 GB 0.0 B 3 0 308 311 22.4 m 0.0 B 0.0 B Executor jmap output: Server compiler detected. JVM version is 24.55-b03 using thread-local object allocation. Parallel GC with 18 thread(s) Heap Configuration: MinHeapFreeRatio = 40 MaxHeapFreeRatio = 70 MaxHeapSize = 10737418240 (10240.0MB) NewSize = 1310720 (1.25MB) MaxNewSize = 17592186044415 MB OldSize = 5439488 (5.1875MB) NewRatio = 2 SurvivorRatio = 8 PermSize = 21757952 (20.75MB) MaxPermSize = 134217728 (128.0MB) G1HeapRegionSize = 0 (0.0MB) Heap Usage: PS Young Generation Eden Space: capacity = 2783969280 (2655.0MB) used = 192583816 (183.66223907470703MB) free = 2591385464 (2471.337760925293MB) 6.917598458557704% used >From Space: capacity = 409993216 (391.0MB) used = 1179808 (1.125152587890625MB) free = 408813408 (389.8748474121094MB) 0.2877628102022059% used To Space: capacity = 385351680 (367.5MB) used = 0 (0.0MB) free = 385351680 (367.5MB) 0.0% used PS Old Generation capacity = 7158628352 (6827.0MB) used = 4455093024 (4248.707794189453MB) free = 2703535328 (2578.292205810547MB) 62.2338918146983% used PS Perm Generation capacity = 90701824 (86.5MB) used = 45348832 (43.248016357421875MB) free = 45352992 (43.251983642578125MB) 49.99770677158598% used 8432 interned Strings occupying 714672 bytes. Executor GC log snippet: 168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)] 9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13 sys=0.39, real=0.32 secs] 169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] [ParOldGen: 6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) [PSPermGen: 44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22 sys=0.18, real=4.55 secs] 174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)] 3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66 sys=0.00, real=0.09 secs] I tried to map partitions to cores on the nodes. Increasing the number of partitions (say to 80 or 100) would result in progress till the 6th iteration or so, but the next stage would stall as before with apparent root cause / logs. With increased partitions, the last stage that completed had the following task times: Metric Min 25th Median 75th Max Result serialization time 11 ms 12 ms 13 ms 15 ms 0.4 s Duration 0.5 s 0.9 s 1 s 3 s 7 s Time spent fetching 0 ms 0 ms 0 ms 0 ms 0 ms task results Scheduler delay 5 s 6 s 6 s 7 s 12 s My hypothesis is that as the coefficient array becomes less sparse (with successive iterations), the cost of the aggregate goes up to the point that it stalls (which I failed to explain). Reducing the batch fraction to a very low number like 0.01 saw the iterations progress further, but the model failed to converge in that case after a small number of iterations. I also tried reducing the number of records by aggregating on (x,y) as the key (i.e. using aggregations instead of training on every raw record), but encountered by the following exception: Loss was due to java.lang.NullPointerException java.lang.NullPointerException at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I'd appreciate any insights/comments about what may be causing the execution to stall. If logs/tables appear poorly indented in the email, here's a gist with relevant details: https://gist.github.com/reachbach/a418ab2f01b639b624c1 Thanks, Bharath