Hi Xiangrui,

I have 16 * 40 cpu cores in total. But I am only using 200 partitions on the 
200 executors. I use coalesce without shuffle to reduce the default partition 
of RDD.

The shuffle size from the WebUI is nearly 100m.

On Jul 25, 2014, at 23:51, Xiangrui Meng <men...@gmail.com> wrote:

> How many partitions did you use and how many CPU cores in total? The
> former shouldn't be much larger than the latter. Could you also check
> the shuffle size from the WebUI? -Xiangrui
> 
> On Fri, Jul 25, 2014 at 4:10 AM, Charles Li <littlee1...@gmail.com> wrote:
>> Hi Xiangrui,
>> 
>> Thanks for your treeAggregate patch. It is very helpful.
>> After applying your patch in my local repos, the new spark can handle more 
>> partition than before.
>> But after some iteration(mapPartition + reduceByKey), the reducer seems 
>> become more slower and finally hang.
>> 
>> The logs shows there always 1 message pending in the outbox, and we are 
>> waiting for it. Are you aware this kind issue?
>> How can I know which message is pending?  Where is it supposed to go?
>> 
>> Log:
>> 
>> 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 
>> is 752
>> 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
>> driver
>> 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
>> 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
>> [*********/**********]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
>> [********/************]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
>> [********/********], 1 messages pending
>> 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
>> shuffle 0
>> 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>> task 742
>> 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
>> 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
>> 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
>> clearing cache
>> 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 
>> is 752
>> 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
>> driver
>> 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
>> <—— I have shutdown the App
>> 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver 
>> commanded a shutdown
>> 
>> On Jul 2, 2014, at 0:08, Xiangrui Meng <men...@gmail.com> wrote:
>> 
>>> Try to reduce number of partitions to match the number of cores. We
>>> will add treeAggregate to reduce the communication cost.
>>> 
>>> PR: https://github.com/apache/spark/pull/1110
>>> 
>>> -Xiangrui
>>> 
>>> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li <littlee1...@gmail.com> wrote:
>>>> Hi Spark,
>>>> 
>>>> I am running LBFGS on our user data. The data size with Kryo serialisation 
>>>> is about 210G. The weight size is around 1,300,000. I am quite confused 
>>>> that the performance is very close whether the data is cached or not.
>>>> 
>>>> The program is simple:
>>>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>>>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
>>>> cached
>>>> gradient = new LogisticGrandient();
>>>> updater = new SquaredL2Updater();
>>>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>>>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
>>>> convergeTol, maxIter, regParam, initWeight);
>>>> 
>>>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
>>>> cluster mode. Below are some arguments I am using:
>>>> —executor-memory 10G
>>>> —num-executors 50
>>>> —executor-cores 2
>>>> 
>>>> Storage Using:
>>>> When caching:
>>>> Cached Partitions 951
>>>> Fraction Cached 100%
>>>> Size in Memory 215.7GB
>>>> Size in Tachyon 0.0B
>>>> Size on Disk 1029.7MB
>>>> 
>>>> The time cost by every aggregate is around 5 minutes with cache enabled. 
>>>> Lots of disk IOs can be seen on the hadoop node. I have the same result 
>>>> with cache disabled.
>>>> 
>>>> Should data points caching improve the performance? Should caching 
>>>> decrease the disk IO?
>>>> 
>>>> Thanks in advance.
>> 

Reply via email to