RE: question on SPARK_WORKER_CORES

2017-02-17 Thread Satish Lalam
Have you tried passing --executor-cores or –total-executor-cores as arguments, 
, depending on the spark version?


From: kant kodali [mailto:kanth...@gmail.com]
Sent: Friday, February 17, 2017 5:03 PM
To: Alex Kozlov 
Cc: user @spark 
Subject: Re: question on SPARK_WORKER_CORES

Standalone.

On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov 
> wrote:
What Spark mode are you running the program in?

On Fri, Feb 17, 2017 at 4:55 PM, kant kodali 
> wrote:
when I submit a job using spark shell I get something like this


[Stage 0:>(36814 + 4) / 220129]



Now all I want is I want to increase number of parallel tasks running from 4 to 
16 so I exported an env variable called SPARK_WORKER_CORES=16 in 
conf/spark-env.sh. I though that should do it but it doesn't. It still shows me 
4. any idea?



Thanks much!




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com



Re: Question about Parallel Stages in Spark

2017-06-27 Thread satish lalam
Thanks Bryan. This is one Spark application with one job. This job has 3
stages. The first 2 are basic reads from cassandra tables and the 3rd is a
join between the two. I was expecting the first 2 stages to run in
parallel, however they run serially. Job has enough resources.

On Tue, Jun 27, 2017 at 4:03 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Satish,
>
> Is this two separate applications submitted to the Yarn scheduler? If so
> then you would expect that you would see the original case run in parallel.
>
> However, if this is one application your submission to Yarn guarantees
> that this application will fairly  contend with resources requested by
> other applications. However, the internal output operations within your
> application (jobs) will be scheduled by the driver (running on a single
> AM). This means that whatever driver options and code you've set will
> impact the application, but the Yarn scheduler will not impact (beyond
> allocating cores, memory, etc. between applications.)
>
>
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
>
>
>
> On Tue, Jun 27, 2017 at 2:33 AM -0400, "satish lalam" <
> satish.la...@gmail.com> wrote:
>
> Thanks All. To reiterate - stages inside a job can be run parallely as
>> long as - (a) there is no sequential dependency (b) the job has sufficient
>> resources.
>> however, my code was launching 2 jobs and they are sequential as you
>> rightly pointed out.
>> The issue which I was trying to highlight with that piece of pseudocode
>> however was that - I am observing a job with 2 stages which dont depend on
>> each other (they both are reading data from 2 seperate tables in db), they
>> both are scheduled and both stages get resources - but the 2nd stage really
>> does not pick up until the 1st stage is complete. It might be due to the db
>> driver - I will post it to the right forum. Thanks.
>>
>> On Mon, Jun 26, 2017 at 9:12 PM, Pralabh Kumar <pralabhku...@gmail.com>
>> wrote:
>>
>>> i think my words also misunderstood. My point is they will not submit
>>> together since they are the part of one thread.
>>>
>>> val spark =  SparkSession.builder()
>>>   .appName("practice")
>>>   .config("spark.scheduler.mode","FAIR")
>>>   .enableHiveSupport().getOrCreate()
>>> val sc = spark.sparkContext
>>> sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
>>> sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
>>> Thread.sleep(1000)
>>>
>>>
>>> I ran this and both spark submit time are different for both the jobs .
>>>
>>> Please let me if I am wrong
>>>
>>> On Tue, Jun 27, 2017 at 9:17 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>>>
>>>> My words cause misunderstanding.
>>>> Step 1:A is submited to spark.
>>>> Step 2:B is submitted to spark.
>>>>
>>>> Spark gets two independent jobs.The FAIR  is used to schedule A and B.
>>>>
>>>> Jeffrey' code did not cause two submit.
>>>>
>>>>
>>>>
>>>> ---Original---
>>>> *From:* "Pralabh Kumar"<pralabhku...@gmail.com>
>>>> *Date:* 2017/6/27 12:09:27
>>>> *To:* "萝卜丝炒饭"<1427357...@qq.com>;
>>>> *Cc:* 
>>>> "user"<user@spark.apache.org>;"satishl"<satish.la...@gmail.com>;"Bryan
>>>> Jeffrey"<bryan.jeff...@gmail.com>;
>>>> *Subject:* Re: Question about Parallel Stages in Spark
>>>>
>>>> Hi
>>>>
>>>> I don't think so spark submit ,will receive two submits .  Its will
>>>> execute one submit and then to next one .  If the application is
>>>> multithreaded ,and two threads are calling spark submit and one time , then
>>>> they will run parallel provided the scheduler is FAIR and task slots are
>>>> available .
>>>>
>>>> But in one thread ,one submit will complete and then the another one
>>>> will start . If there are independent stages in one job, then those will
>>>> run parallel.
>>>>
>>>> I agree with Bryan Jeffrey .
>>>>
>>>>
>>>> Regards
>>>> Pralabh Kumar
>>>>
>>>> On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>>>>
>>>>> I think the spark cluster receives two submits, A and B.
>>>>

Re: Broadcasts & Storage Memory

2017-06-21 Thread satish lalam
My understanding is - it from storageFraction. Here cached blocks are
immune to eviction - so both persisted RDDs and broadcast variables sit
here. Ref



On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> Question: Do broadcast variables stored on executors count as part of
> 'storage memory' or other memory?
>
> A little bit more detail:
>
> I understand that we have two knobs to control memory allocation:
> - spark.memory.fraction
> - spark.memory.storageFraction
>
> My understanding is that spark.memory.storageFraction controls the amount
> of memory allocated for cached RDDs.  spark.memory.fraction controls how
> much memory is allocated to Spark operations (task serialization,
> operations, etc.), w/ the remainder reserved for user data structures,
> Spark internal metadata, etc.  This includes the storage memory for cached
> RDDs.
>
> You end up with executor memory that looks like the following:
> All memory: 0-100
> Spark memory: 0-75
> RDD Storage: 0-37
> Other Spark: 38-75
> Other Reserved: 76-100
>
> Where do broadcast variables fall into the mix?
>
> Regards,
>
> Bryan Jeffrey
>


Re: Why my project has this kind of error ?

2017-06-22 Thread satish lalam
Minglei - You could check your jdk path and scala library setting in
project structure. i.e., project view (alt + 1), and then pressing F4 to
open Project structure... look under SDKs and Libraries.

On Mon, Jun 19, 2017 at 10:54 PM, 张明磊  wrote:

> Hello to all,
>
> Below is my issue. I have already build again and reimport my project in
> IntelliJIDEA, but it still gives me this kind of error. But I can build
> without error by Maven. Just the IDEA gives me this error. Is there anyone
> know what happened with this ?
>
>
> Thanks
> Minglei
>
>
>
>
>
>


Re: Question about Parallel Stages in Spark

2017-06-27 Thread satish lalam
Thanks All. To reiterate - stages inside a job can be run parallely as long
as - (a) there is no sequential dependency (b) the job has sufficient
resources.
however, my code was launching 2 jobs and they are sequential as you
rightly pointed out.
The issue which I was trying to highlight with that piece of pseudocode
however was that - I am observing a job with 2 stages which dont depend on
each other (they both are reading data from 2 seperate tables in db), they
both are scheduled and both stages get resources - but the 2nd stage really
does not pick up until the 1st stage is complete. It might be due to the db
driver - I will post it to the right forum. Thanks.

On Mon, Jun 26, 2017 at 9:12 PM, Pralabh Kumar 
wrote:

> i think my words also misunderstood. My point is they will not submit
> together since they are the part of one thread.
>
> val spark =  SparkSession.builder()
>   .appName("practice")
>   .config("spark.scheduler.mode","FAIR")
>   .enableHiveSupport().getOrCreate()
> val sc = spark.sparkContext
> sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
> sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
> Thread.sleep(1000)
>
>
> I ran this and both spark submit time are different for both the jobs .
>
> Please let me if I am wrong
>
> On Tue, Jun 27, 2017 at 9:17 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>
>> My words cause misunderstanding.
>> Step 1:A is submited to spark.
>> Step 2:B is submitted to spark.
>>
>> Spark gets two independent jobs.The FAIR  is used to schedule A and B.
>>
>> Jeffrey' code did not cause two submit.
>>
>>
>>
>> ---Original---
>> *From:* "Pralabh Kumar"
>> *Date:* 2017/6/27 12:09:27
>> *To:* "萝卜丝炒饭"<1427357...@qq.com>;
>> *Cc:* "user";"satishl";"Bryan
>> Jeffrey";
>> *Subject:* Re: Question about Parallel Stages in Spark
>>
>> Hi
>>
>> I don't think so spark submit ,will receive two submits .  Its will
>> execute one submit and then to next one .  If the application is
>> multithreaded ,and two threads are calling spark submit and one time , then
>> they will run parallel provided the scheduler is FAIR and task slots are
>> available .
>>
>> But in one thread ,one submit will complete and then the another one will
>> start . If there are independent stages in one job, then those will run
>> parallel.
>>
>> I agree with Bryan Jeffrey .
>>
>>
>> Regards
>> Pralabh Kumar
>>
>> On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>>
>>> I think the spark cluster receives two submits, A and B.
>>> The FAIR  is used to schedule A and B.
>>> I am not sure about this.
>>>
>>> ---Original---
>>> *From:* "Bryan Jeffrey"
>>> *Date:* 2017/6/27 08:55:42
>>> *To:* "satishl";
>>> *Cc:* "user";
>>> *Subject:* Re: Question about Parallel Stages in Spark
>>>
>>> Hello.
>>>
>>> The driver is running the individual operations in series, but each
>>> operation is parallelized internally.  If you want them run in parallel you
>>> need to provide the driver a mechanism to thread the job scheduling out:
>>>
>>> val rdd1 = sc.parallelize(1 to 10)
>>> val rdd2 = sc.parallelize(1 to 20)
>>>
>>> var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, 
>>> rdd2).zipWithIndex.par
>>>
>>> thingsToDo.foreach { case(rdd, index) =>
>>>   for(i <- (1 to 1))
>>> logger.info(s"Index ${index} - ${rdd.sum()}")
>>> }
>>>
>>>
>>> This will run both operations in parallel.
>>>
>>>
>>> On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:
>>>
 For the below code, since rdd1 and rdd2 dont depend on each other - i
 was
 expecting that both first and second printlns would be interwoven.
 However -
 the spark job runs all "first " statements first and then all "seocnd"
 statements next in serial fashion. I have set spark.scheduler.mode =
 FAIR.
 obviously my understanding of parallel stages is wrong. What am I
 missing?

 val rdd1 = sc.parallelize(1 to 100)
 val rdd2 = sc.parallelize(1 to 100)

 for (i <- (1 to 100))
   println("first: " + rdd1.sum())
 for (i <- (1 to 100))
   println("second" + rdd2.sum())



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spar
 k-tp28793.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


Re: Spark Streaming Design Suggestion

2017-06-14 Thread satish lalam
Agree with Jörn. Dynamically creating/deleting Topics is nontrivial to
manage.
With the limited knowledge about your scenario - it appears that you are
using topics as some kind of message type enum.
If that is the case - you might be better off with one (or just a few
topics) and have a messagetype field in kafka event itself.
Your streaming job can then match-case incoming events on this field to
choose the right processor for respective events.

On Tue, Jun 13, 2017 at 1:47 PM, Jörn Franke  wrote:

> I do not fully understand the design here.
> Why not send all to one topic with some application id in the message and
> you write to one topic also indicating the application id.
>
> Can you elaborate a little bit more on the use case?
>
> Especially applications deleting/creating topics dynamically can be a
> nightmare to operate
>
> > On 13. Jun 2017, at 22:03, Shashi Vishwakarma 
> wrote:
> >
> > Hi
> >
> > I have to design a spark streaming application with below use case. I am
> looking for best possible approach for this.
> >
> > I have application which pushing data into 1000+ different topics each
> has different purpose . Spark streaming will receive data from each topic
> and after processing it will write back to corresponding another topic.
> >
> > Ex.
> >
> > Input Type 1 Topic  --> Spark Streaming --> Output Type 1 Topic
> > Input Type 2 Topic  --> Spark Streaming --> Output Type 2 Topic
> > Input Type 3 Topic  --> Spark Streaming --> Output Type 3 Topic
> > .
> > .
> > .
> > Input Type N Topic  --> Spark Streaming --> Output Type N Topic  and so
> on.
> >
> > I need to answer following questions.
> >
> > 1. Is it a good idea to launch 1000+ spark streaming application per
> topic basis ? Or I should have one streaming application for all topics as
> processing logic going to be same ?
> > 2. If one streaming context , then how will I determine which RDD
> belongs to which Kafka topic , so that after processing I can write it back
> to its corresponding OUTPUT Topic?
> > 3. Client may add/delete topic from Kafka , how do dynamically handle in
> Spark streaming ?
> > 4. How do I restart job automatically on failure ?
> >
> > Any other issue you guys see here ?
> >
> > Highly appreicate your response.
> >
> > Thanks
> > Shashi
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Read Local File

2017-06-14 Thread satish lalam
I guess you have already made sure that the paths for your file are exactly
the same on each of your nodes. I'd also check the perms on your path.
Believe the sample code you pasted is only for testing - and you are
already aware that a distributed count on a local file has no benefits.
Once I ran into a similar issue while copy pasting file paths probably due
to encoding issues on some text editors. I'd copied a hidden char at the
end of the path from source file which made my file lookup fail, but the
code looked perfectly alright. Typing the path explicitly resolved it. But
this is a corner case.

Alternately - if the file size is small, you could do spark-submit with a
--files option which will ship the file to every executor and is available
for all executors.




On Tue, Jun 13, 2017 at 11:02 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi all,
> I'm trying to read a File from local filesystem, I'd 4 workstations 1
> Master and 3 slaves, running with Ambari and Yarn with Spark version*
> 2.1.1.2.6.1.0-129*
>
> The code that I'm trying to run is quite simple
>
> spark.sqlContext.read.text("file:///pathToFile").count
>
> I've copied the file in all 4 workstations and every time that I try to
> run this I got the following exception:
> 17/06/13 17:57:37 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 12,
> ip, executor 1): java.io.FileNotFoundException: File file:pathToFile does
> not exist
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
> in SQL or by recreating the Dataset/DataFrame involved.
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:175)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:109)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(
> BypassMergeSortShuffleWriter.java:126)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 17/06/13 17:57:37 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4
> times; aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 6.0 (TID 15, ip, executor 1): java.io.FileNotFoundException: File
> file:file:pathToFile does not exist
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
> in SQL or by recreating the Dataset/DataFrame involved.
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:175)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:109)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(
> BypassMergeSortShuffleWriter.java:126)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>   at