Hi Xiangrui, I have 16 * 40 cpu cores in total. But I am only using 200 partitions on the 200 executors. I use coalesce without shuffle to reduce the default partition of RDD.
The shuffle size from the WebUI is nearly 100m. On Jul 25, 2014, at 23:51, Xiangrui Meng <men...@gmail.com> wrote: > How many partitions did you use and how many CPU cores in total? The > former shouldn't be much larger than the latter. Could you also check > the shuffle size from the WebUI? -Xiangrui > > On Fri, Jul 25, 2014 at 4:10 AM, Charles Li <littlee1...@gmail.com> wrote: >> Hi Xiangrui, >> >> Thanks for your treeAggregate patch. It is very helpful. >> After applying your patch in my local repos, the new spark can handle more >> partition than before. >> But after some iteration(mapPartition + reduceByKey), the reducer seems >> become more slower and finally hang. >> >> The logs shows there always 1 message pending in the outbox, and we are >> waiting for it. Are you aware this kind issue? >> How can I know which message is pending? Where is it supposed to go? >> >> Log: >> >> 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally >> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 >> is 752 >> 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to >> driver >> 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302 >> 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from >> [*********/**********] >> 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to >> [********/************] >> 14/07/25 17:50:34 INFO network.SendingConnection: Connected to >> [********/********], 1 messages pending >> 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for >> shuffle 0 >> 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned >> task 742 >> 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742 >> 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally >> 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and >> clearing cache >> 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally >> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register >> 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 >> is 752 >> 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to >> driver >> 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742 >> <—— I have shutdown the App >> 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver >> commanded a shutdown >> >> On Jul 2, 2014, at 0:08, Xiangrui Meng <men...@gmail.com> wrote: >> >>> Try to reduce number of partitions to match the number of cores. We >>> will add treeAggregate to reduce the communication cost. >>> >>> PR: https://github.com/apache/spark/pull/1110 >>> >>> -Xiangrui >>> >>> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li <littlee1...@gmail.com> wrote: >>>> Hi Spark, >>>> >>>> I am running LBFGS on our user data. The data size with Kryo serialisation >>>> is about 210G. The weight size is around 1,300,000. I am quite confused >>>> that the performance is very close whether the data is cached or not. >>>> >>>> The program is simple: >>>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..) >>>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not >>>> cached >>>> gradient = new LogisticGrandient(); >>>> updater = new SquaredL2Updater(); >>>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{}) >>>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, >>>> convergeTol, maxIter, regParam, initWeight); >>>> >>>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its >>>> cluster mode. Below are some arguments I am using: >>>> —executor-memory 10G >>>> —num-executors 50 >>>> —executor-cores 2 >>>> >>>> Storage Using: >>>> When caching: >>>> Cached Partitions 951 >>>> Fraction Cached 100% >>>> Size in Memory 215.7GB >>>> Size in Tachyon 0.0B >>>> Size on Disk 1029.7MB >>>> >>>> The time cost by every aggregate is around 5 minutes with cache enabled. >>>> Lots of disk IOs can be seen on the hadoop node. I have the same result >>>> with cache disabled. >>>> >>>> Should data points caching improve the performance? Should caching >>>> decrease the disk IO? >>>> >>>> Thanks in advance. >>