Re: Running Spark in local mode seems to ignore local[N]
Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
Thanks, Sean. This was not yet digested data for me :) The number of partitions in a streaming RDD is determined by the block interval and the batch interval. I have seen the bit on spark.streaming.blockInterval in the doc but I didn't connect it with the batch interval and the number of partitions. On Mon, May 11, 2015 at 5:34 PM, Sean Owen so...@cloudera.com wrote: You might have a look at the Spark docs to start. 1 batch = 1 RDD, but 1 RDD can have many partitions. And should, for scale. You do not submit multiple jobs to get parallelism. The number of partitions in a streaming RDD is determined by the block interval and the batch interval. If you have a batch interval of 10s and block interval of 1s you'll get 10 partitions of data in the RDD. On Mon, May 11, 2015 at 10:29 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Understood. We'll use the multi-threaded code we already have.. How are these execution slots filled up? I assume each slot is dedicated to one submitted task. If that's the case, how is each task distributed then, i.e. how is that task run in a multi-node fashion? Say 1000 batches/RDD's are extracted out of Kafka, how does that relate to the number of executors vs. task slots? Presumably we can fill up the slots with multiple instances of the same task... How do we know how many to launch? On Mon, May 11, 2015 at 5:20 PM, Sean Owen so...@cloudera.com wrote: BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
Are you actually running anything that requires all those slots? e.g., locally, I get this with local[16], but only after I run something that actually uses those 16 slots: Executor task launch worker-15 daemon prio=10 tid=0x7f4c80029800 nid=0x8ce waiting on condition [0x7f4c62493000] Executor task launch worker-14 daemon prio=10 tid=0x7f4c80027800 nid=0x8cd waiting on condition [0x7f4c62594000] Executor task launch worker-13 daemon prio=10 tid=0x7f4c80025800 nid=0x8cc waiting on condition [0x7f4c62695000] Executor task launch worker-12 daemon prio=10 tid=0x7f4c80023800 nid=0x8cb waiting on condition [0x7f4c62796000] Executor task launch worker-11 daemon prio=10 tid=0x7f4c80021800 nid=0x8ca waiting on condition [0x7f4c62897000] Executor task launch worker-10 daemon prio=10 tid=0x7f4c8001f800 nid=0x8c9 waiting on condition [0x7f4c62998000] Executor task launch worker-9 daemon prio=10 tid=0x7f4c8001d800 nid=0x8c8 waiting on condition [0x7f4c62a99000] Executor task launch worker-8 daemon prio=10 tid=0x7f4c8001b800 nid=0x8c7 waiting on condition [0x7f4c62b9a000] Executor task launch worker-7 daemon prio=10 tid=0x7f4c80019800 nid=0x8c6 waiting on condition [0x7f4c62c9b000] Executor task launch worker-6 daemon prio=10 tid=0x7f4c80018000 nid=0x8c5 waiting on condition [0x7f4c62d9c000] Executor task launch worker-5 daemon prio=10 tid=0x7f4c80011000 nid=0x8c4 waiting on condition [0x7f4c62e9d000] Executor task launch worker-4 daemon prio=10 tid=0x7f4c8000f800 nid=0x8c3 waiting on condition [0x7f4c62f9e000] Executor task launch worker-3 daemon prio=10 tid=0x7f4c8000e000 nid=0x8c2 waiting on condition [0x7f4c6309f000] Executor task launch worker-2 daemon prio=10 tid=0x7f4c8000c800 nid=0x8c1 waiting on condition [0x7f4c631a] Executor task launch worker-1 daemon prio=10 tid=0x7f4c80007800 nid=0x8c0 waiting on condition [0x7f4c632a1000] Executor task launch worker-0 daemon prio=10 tid=0x7f4c80015800 nid=0x8bf waiting on condition [0x7f4c635f4000] On Mon, May 11, 2015 at 1:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Running Spark in local mode seems to ignore local[N]
Understood. We'll use the multi-threaded code we already have.. How are these execution slots filled up? I assume each slot is dedicated to one submitted task. If that's the case, how is each task distributed then, i.e. how is that task run in a multi-node fashion? Say 1000 batches/RDD's are extracted out of Kafka, how does that relate to the number of executors vs. task slots? Presumably we can fill up the slots with multiple instances of the same task... How do we know how many to launch? On Mon, May 11, 2015 at 5:20 PM, Sean Owen so...@cloudera.com wrote: BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
Seems to be running OK with 4 threads, 16 threads... While running with 32 threads I started getting the below. 15/05/11 19:48:46 WARN executor.Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@7668b255,BlockManagerId(driver, localhost, 43318))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-677986522]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) ... 1 more On Mon, May 11, 2015 at 5:34 PM, Sean Owen so...@cloudera.com wrote: You might have a look at the Spark docs to start. 1 batch = 1 RDD, but 1 RDD can have many partitions. And should, for scale. You do not submit multiple jobs to get parallelism. The number of partitions in a streaming RDD is determined by the block interval and the batch interval. If you have a batch interval of 10s and block interval of 1s you'll get 10 partitions of data in the RDD. On Mon, May 11, 2015 at 10:29 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Understood. We'll use the multi-threaded code we already have.. How are these execution slots filled up? I assume each slot is dedicated to one submitted task. If that's the case, how is each task distributed then, i.e. how is that task run in a multi-node fashion? Say 1000 batches/RDD's are extracted out of Kafka, how does that relate to the number of executors vs. task slots? Presumably we can fill up the slots with multiple instances of the same task... How do we know how many to launch? On Mon, May 11, 2015 at 5:20 PM, Sean Owen so...@cloudera.com wrote: BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org