RE: Which strategy is used for broadcast variables?
Hi Tom, That's an outdated document from 4/5 years ago. Spark currently uses a BitTorrent like mechanism that's been tuned for datacenter environments. Mosharaf -Original Message- From: Tom thubregt...@gmail.com Sent: 3/11/2015 4:58 PM To: user@spark.apache.org user@spark.apache.org Subject: Which strategy is used for broadcast variables? In Performance and Scalability of Broadcast in Spark by Mosharaf Chowdhury I read that Spark uses HDFS for its broadcast variables. This seems highly inefficient. In the same paper alternatives are proposed, among which Bittorent Broadcast (BTB). While studying Learning Spark, page 105, second paragraph about Broadcast Variables, I read The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism. - Is the book talking about the proposed BTB from the paper? - Is this currently the default? - If not, what is? Thanks, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-strategy-is-used-for-broadcast-variables-tp22004.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which strategy is used for broadcast variables?
The current broadcast algorithm in Spark approximates the one described in the Section 5 of this paper http://www.mosharaf.com/wp-content/uploads/orchestra-sigcomm11.pdf. It is expected to scale sub-linearly; i.e., O(log N), where N is the number of machines in your cluster. We evaluated up to 100 machines, and it does follow O(log N) scaling. -- Mosharaf Chowdhury http://www.mosharaf.com/ On Wed, Mar 11, 2015 at 3:11 PM, Tom Hubregtsen thubregt...@gmail.com wrote: Thanks Mosharaf, for the quick response! Can you maybe give me some pointers to an explanation of this strategy? Or elaborate a bit more on it? Which parts are involved in which way? Where are the time penalties and how scalable is this implementation? Thanks again, Tom On 11 March 2015 at 16:01, Mosharaf Chowdhury mosharafka...@gmail.com wrote: Hi Tom, That's an outdated document from 4/5 years ago. Spark currently uses a BitTorrent like mechanism that's been tuned for datacenter environments. Mosharaf -- From: Tom thubregt...@gmail.com Sent: 3/11/2015 4:58 PM To: user@spark.apache.org Subject: Which strategy is used for broadcast variables? In Performance and Scalability of Broadcast in Spark by Mosharaf Chowdhury I read that Spark uses HDFS for its broadcast variables. This seems highly inefficient. In the same paper alternatives are proposed, among which Bittorent Broadcast (BTB). While studying Learning Spark, page 105, second paragraph about Broadcast Variables, I read The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism. - Is the book talking about the proposed BTB from the paper? - Is this currently the default? - If not, what is? Thanks, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-strategy-is-used-for-broadcast-variables-tp22004.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How Broadcast variable scale?.
Hi Guillermo, The current broadcast algorithm in Spark approximates the one described in the Section 5 of this paper http://www.mosharaf.com/wp-content/uploads/orchestra-sigcomm11.pdf. It is expected to scale sub-linearly; i.e., O(log N), where N is the number of machines in your cluster. We evaluated up to 100 machines, and it does follow O(log N) scaling. Have you tried it on your 300-machine cluster? I'm curious to know what happened. -Mosharaf On Mon, Feb 23, 2015 at 8:06 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I'm looking for about how scale broadcast variables in Spark and what algorithm uses. I have found http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf I don't know if they're talking about the current version (1.2.1) because the file was created in 2010. I took a look to the documentation and API and I read that there is an TorrentFactory for broadcast variable it's which it uses Spark right now? In the article they talk that Spark uses another one (Centralized HDFS Broadcast) How does it scale if I have a big cluster (about 300 nodes) the current algorithm?? is it linear? are there others options to choose others algorithms? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running the BroadcastTest.scala with TorrentBroadcastFactory in a standalone cluster
Hi Jack, 1. Several previous instances of key not valid? error had been attributed to memory issues, either memory allocated per executor or per task, depending on the context. You can google it to see some examples. 2. I think your case is similar, even though its happening due to broadcast. I suspect specifically, this line 14/07/02 18:20:09 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 after the driver commanded a shutdown. It's happening only for TorrentBroadcast, because HttpBroadcast does not store intermediate chunks of a broadcast in memory. 3. You might want to allocate more memory to Spark executors to take advantage of in-memory processing. ~300MB caching space per machine is likely to be too small for most jobs. 4. Another common cause of disconnection is the spark.akka.frameSize parameter. You can try playing with it. While you don't enough memory to crank it up, you can try moving it up and down within reason. 5. There is one more curious line in your trace: 14/07/02 18:20:06 INFO BlockManager: Removing broadcast 0 Nothing should've been there to remove in the first place. 6. Finally, we found in our benchmarks that using TorrentBroadcast in smaller clusters (10) and small data size (10MB) has no benefit over HttpBroadcast, and often worse. I'd suggest sticking to HttpBroadcast unless you have gigantic broadcast (=1GB) or too many nodes (many 10s or 100s). Hope it helps, Mosharaf -- Mosharaf Chowdhury http://www.mosharaf.com/ On Thu, Jul 3, 2014 at 7:48 AM, jackxucs jackx...@gmail.com wrote: Hello, I am running the BroadcastTest example in a standalone cluster using spark-submit. I have 8 host machines and made Host1 the master. Host2 to Host8 act as 7 workers to connect to the master. The connection was fine as I could see all 7 hosts on the master web ui. The BroadcastTest example with Http broadcast also works fine, I think, as there was no error msg and all workers EXITED at the end. But when I changed the third argument from Http to Torrent to use Torrent broadcast, all workers got a KILLED status once they reached sc.stop(). Below is the stderr on one of the workers when running Torrent broadcast (I masked the IP addresses): == 14/07/02 18:20:03 INFO SecurityManager: Changing view acls to: root 14/07/02 18:20:03 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:20:04 INFO Remoting: Starting remoting 14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771] 14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses: [akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771] 14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root 14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:20:04 INFO Remoting: Starting remoting 14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661] 14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661] 14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/CoarseGrainedScheduler 14/07/02 18:20:04 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker 14/07/02 18:20:04 INFO Remoting: Remoting shut down 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/07/02 18:20:04 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker 14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root 14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:20:04 INFO Remoting: Starting remoting 14/07/02 18:20:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@dyn-xxx-xx-xx-xx:57883] 14/07/02 18:20:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@dyn-xxx-xx-xx-xx:57883] 14/07/02 18:20:05 INFO SparkEnv: Connecting to MapOutputTracker
Re: sync master with slaves with bittorrent?
Good catch. In that case, using BitTornado/murder would be better. -- Mosharaf Chowdhury http://www.mosharaf.com/ On Mon, May 19, 2014 at 11:17 AM, Aaron Davidson ilike...@gmail.com wrote: On the ec2 machines, you can update the slaves from the master using something like ~/spark-ec2/copy-dir ~/spark. Spark's TorrentBroadcast relies on the Block Manager to distribute blocks, making it relatively hard to extract. On Mon, May 19, 2014 at 12:36 AM, Daniel Mahler dmah...@gmail.com wrote: btw is there a command or script to update the slaves from the master? thanks Daniel On Mon, May 19, 2014 at 1:48 AM, Andrew Ash and...@andrewash.com wrote: If the codebase for Spark's broadcast is pretty self-contained, you could consider creating a small bootstrap sent out via the doubling rsync strategy that Mosharaf outlined above (called Tree D=2 in the paper) that then pulled the larger Mosharaf, do you have a sense of whether the gains from using Cornet vs Tree D=2 with rsync outweighs the overhead of using a 2-phase broadcast mechanism? Andrew On Sun, May 18, 2014 at 11:32 PM, Aaron Davidson ilike...@gmail.comwrote: One issue with using Spark itself is that this rsync is required to get Spark to work... Also note that a similar strategy is used for *updating* the spark cluster on ec2, where the diff aspect is much more important, as you might only make a small change on the driver node (recompile or reconfigure) and can get a fast sync. On Sun, May 18, 2014 at 11:22 PM, Mosharaf Chowdhury mosharafka...@gmail.com wrote: What twitter calls murder, unless it has changed since then, is just a BitTornado wrapper. In 2011, We did some comparison on the performance of murder and the TorrentBroadcast we have right now for Spark's own broadcast (Section 7.1 in http://www.mosharaf.com/wp-content/uploads/orchestra-sigcomm11.pdf). Spark's implementation was 4.5X faster than murder. The only issue with using TorrentBroadcast to deploy code/VM is writing a wrapper around it to read from disk, but it shouldn't be too complicated. If someone picks it up, I can give some pointers on how to proceed (I've thought about doing it myself forever, but never ended up actually taking the time; right now I don't have enough free cycles either) Otherwise, murder/BitTornado would be better than the current strategy we have. A third option would be to use rsync; but instead of rsync-ing to every slave from the master, one can simply rsync from the master first to one slave; then use the two sources (master and the first slave) to rsync to two more; then four and so on. Might be a simpler solution without many changes. -- Mosharaf Chowdhury http://www.mosharaf.com/ On Sun, May 18, 2014 at 11:07 PM, Andrew Ash and...@andrewash.comwrote: My first thought would be to use libtorrent for this setup, and it turns out that both Twitter and Facebook do code deploys with a bittorrent setup. Twitter even released their code as open source: https://blog.twitter.com/2010/murder-fast-datacenter-code-deploys-using-bittorrent http://arstechnica.com/business/2012/04/exclusive-a-behind-the-scenes-look-at-facebook-release-engineering/ On Sun, May 18, 2014 at 10:44 PM, Daniel Mahler dmah...@gmail.comwrote: I am not an expert in this space either. I thought the initial rsync during launch is really just a straight copy that did not need the tree diff. So it seemed like having the slaves do the copying among it each other would be better than having the master copy to everyone directly. That made me think of bittorrent, though there may well be other systems that do this. From the launches I did today it seems that it is taking around 1 minute per slave to launch a cluster, which can be a problem for clusters with 10s or 100s of slaves, particularly since on ec2 that time has to be paid for. On Sun, May 18, 2014 at 11:54 PM, Aaron Davidson ilike...@gmail.com wrote: Out of curiosity, do you have a library in mind that would make it easy to setup a bit torrent network and distribute files in an rsync (i.e., apply a diff to a tree, ideally) fashion? I'm not familiar with this space, but we do want to minimize the complexity of our standard ec2 launch scripts to reduce the chance of something breaking. On Sun, May 18, 2014 at 9:22 PM, Daniel Mahler dmah...@gmail.comwrote: I am launching a rather large cluster on ec2. It seems like the launch is taking forever on Setting up spark RSYNC'ing /root/spark to slaves... ... It seems that bittorrent might be a faster way to replicate the sizeable spark directory to the slaves particularly if there is a lot of not very powerful slaves. Just a thought ... cheers Daniel