Mean over window with minimum number of rows

2018-10-18 Thread Sumona Routh
Hi all,
Before I go the route of rolling my own UDAF:

I'm doing a calculation of last 5 mean so I have the following window
defined:

Window.partitionBy(person).orderBy(timestamp).rowsBetween(-4, Window.currentRow)

Then I calculate the mean over that window.

Within each partition, I'd like the first 4 elements to return null / NaN
because there aren't enough rows to be a true "last 5." This is the
behavior when I do this in pandas using rolling mean. Instead, it appears
to calculate the mean of whatever rows happen to be in the partition, even
if there is only 1 row.

Is there a simple way already in Spark to do this? It seems like a normal
thing so I wonder if I am missing something.

Thanks!
Sumona


Databricks 1/2 day certification course at Spark Summit

2018-05-25 Thread Sumona Routh
Hi all,
My company just now approved for some of us to go to Spark Summit in SF
this year. Unfortunately, the day long workshops on Monday are sold out
now. We are considering what we might do instead.

Have others done the 1/2 day certification course before? Is it worth
considering? Does it cover anything in particular around Spark or is it
more of an exam-prep type course? We have people with varying skill levels
-- would this be a waste for newer Spark devs or is there still some good
info to glean?

We may decide to save the budget for other types of local training, but I
wanted to see what others who may have done this course before think.

Thanks!
Sumona


Re: DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: Path does not exist

2017-07-13 Thread Sumona Routh
Yes, which is what I eventually did. I wanted to check if there was some
"mode" type, similar to SaveMode with writers. Appears that there genuinely
is no option for this and it has to be handled by the client using the
exception flow.

Thanks,
Sumona

On Wed, Jul 12, 2017 at 4:59 PM Yong Zhang <java8...@hotmail.com> wrote:

> Can't you just catch that exception and return an empty dataframe?
>
>
> Yong
>
>
> ------
> *From:* Sumona Routh <sumos...@gmail.com>
> *Sent:* Wednesday, July 12, 2017 4:36 PM
> *To:* user
> *Subject:* DataFrameReader read from S3
> org.apache.spark.sql.AnalysisException: Path does not exist
>
> Hi there,
> I'm trying to read a list of paths from S3 into a dataframe for a window
> of time using the following:
>
> sparkSession.read.parquet(listOfPaths:_*)
>
> In some cases, the path may not be there because there is no data, which
> is an acceptable scenario.
> However, Spark throws an AnalysisException: Path does not exist. Is there
> an option I can set to tell it to gracefully return an empty dataframe if a
> particular path is missing? Looking at the spark code, there is an option
> checkFilesExist, but I don't believe that is set in the particular flow of
> code that I'm accessing.
>
> Thanks!
> Sumona
>
>


DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: Path does not exist

2017-07-12 Thread Sumona Routh
Hi there,
I'm trying to read a list of paths from S3 into a dataframe for a window of
time using the following:

sparkSession.read.parquet(listOfPaths:_*)

In some cases, the path may not be there because there is no data, which is
an acceptable scenario.
However, Spark throws an AnalysisException: Path does not exist. Is there
an option I can set to tell it to gracefully return an empty dataframe if a
particular path is missing? Looking at the spark code, there is an option
checkFilesExist, but I don't believe that is set in the particular flow of
code that I'm accessing.

Thanks!
Sumona


Re: Random Forest hangs without trace of error

2017-05-30 Thread Sumona Routh
Hi Morten,
Were you able to resolve your issue with RandomForest? I am having similar
issues with a newly trained model (that does have larger number of trees,
smaller minInstancesPerNode, which is by design to produce the best
performing model).

I wanted to get some feedback on how you solved your issue before I post a
separate question.

Thanks!
Sumona

On Sun, Dec 11, 2016 at 4:10 AM Marco Mistroni  wrote:

> OK. Did u change spark version? Java/scala/python version?
> Have u tried with different versions of any of the above?
> Hope this helps
> Kr
>
> On 10 Dec 2016 10:37 pm, "Morten Hornbech"  wrote:
>
>> I haven’t actually experienced any non-determinism. We have nightly
>> integration tests comparing output from random forests with no variations.
>>
>> The workaround we will probably try is to split the dataset, either
>> randomly or on one of the variables, and then train a forest on each
>> partition, which should then be sufficiently small.
>>
>> I hope to be able to provide a good repro case in some weeks. If the
>> problem was in our own code I will also post it in this thread.
>>
>> Morten
>>
>> Den 10. dec. 2016 kl. 23.25 skrev Marco Mistroni :
>>
>> Hello Morten
>> ok.
>> afaik there is a tiny bit of randomness in these ML algorithms (pls
>> anyone correct me if i m wrong).
>> In fact if you run your RDF code multiple times, it will not give you
>> EXACTLY the same results (though accuracy and errors should me more or less
>> similar)..at least this is what i found when playing around with
>> RDF and decision trees and other ML algorithms
>>
>> If RDF is not a must for your usecase, could you try 'scale back' to
>> Decision Trees and see if you still get intermittent failures?
>> this at least to exclude issues with the data
>>
>> hth
>>  marco
>>
>> On Sat, Dec 10, 2016 at 5:20 PM, Morten Hornbech 
>> wrote:
>>
>>> Already did. There are no issues with smaller samples. I am running this
>>> in a cluster of three t2.large instances on aws.
>>>
>>> I have tried to find the threshold where the error occurs, but it is not
>>> a single factor causing it. Input size and subsampling rate seems to be
>>> most significant, and number of trees the least.
>>>
>>> I have also tried running on a test frame of randomized numbers with the
>>> same number of rows, and could not reproduce the problem here.
>>>
>>> By the way maxDepth is 5 and maxBins is 32.
>>>
>>> I will probably need to leave this for a few weeks to focus on more
>>> short-term stuff, but I will write here if I solve it or reproduce it more
>>> consistently.
>>>
>>> Morten
>>>
>>> Den 10. dec. 2016 kl. 17.29 skrev Marco Mistroni :
>>>
>>> Hi
>>>  Bring back samples to 1k range to debugor as suggested reduce tree
>>> and bins had rdd running on same size data with no issues.or send
>>> me some sample code and data and I try it out on my ec2 instance ...
>>> Kr
>>>
>>> On 10 Dec 2016 3:16 am, "Md. Rezaul Karim" <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
 I had similar experience last week. Even I could not find any error
 trace.

 Later on, I did the following to get rid of the problem:
 i) I downgraded to Spark 2.0.0
 ii) Decreased the value of maxBins and maxDepth

 Additionally, make sure that you set the featureSubsetStrategy as
 "auto" to let the algorithm choose the best feature subset strategy
 for your data. Finally, set the impurity as "gini" for the information
 gain.

 However, setting the value of no. of trees to just 1 does not give you
 either real advantage of the forest neither better predictive performance.



 Best,
 Karim


 On Dec 9, 2016 11:29 PM, "mhornbech"  wrote:

> Hi
>
> I have spent quite some time trying to debug an issue with the Random
> Forest
> algorithm on Spark 2.0.2. The input dataset is relatively large at
> around
> 600k rows and 200MB, but I use subsampling to make each tree
> manageable.
> However even with only 1 tree and a low sample rate of 0.05 the job
> hangs at
> one of the final stages (see attached). I have checked the logs on all
> executors and the driver and find no traces of error. Could it be a
> memory
> issue even though no error appears? The error does seem sporadic to
> some
> extent so I also wondered whether it could be a data issue, that only
> occurs
> if the subsample includes the bad data rows.
>
> Please comment if you have a clue.
>
> Morten
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n28192/Sk%C3%A6rmbillede_2016-12-10_kl.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-error-tp28192.html
> Sent 

Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-11 Thread Sumona Routh
Hi Sam,
I would absolutely be interested in reading a blog write-up of how you are
doing this. We have pieced together a relatively decent pipeline ourselves,
in jenkins, but have many kinks to work out. We also have some new
requirements to start running side by side comparisons of different
versions of the spark job, so this introduces some additional complexity
for us, and opens up the opportunity to redesign this approach.

To answer the question of "what if jenkins is down," we simply added our
jenkins to our application monitoring (AppD currently, future NewRelic) and
cloudwatch to monitor the instance. We manage our own jenkins box, so there
are no hoops to do what we want in this respect.

Thanks!
Sumona

On Tue, Apr 11, 2017 at 7:20 AM Sam Elamin  wrote:

> Hi Steve
>
>
> Thanks for the detailed response, I think this problem doesn't have an
> industry standard solution as of yet and I am sure a lot of people would
> benefit from the discussion
>
> I realise now what you are saying so thanks for clarifying, that said let
> me try and explain how we approached the problem
>
> There are 2 problems you highlighted, the first if moving the code from
> SCM to prod, and the other is enusiring the data your code uses is correct.
> (using the latest data from prod)
>
>
> *"how do you get your code from SCM into production?"*
>
> We currently have our pipeline being run via airflow, we have our dags in
> S3, with regards to how we get our code from SCM to production
>
> 1) Jenkins build that builds our spark applications and runs tests
> 2) Once the first build is successful we trigger another build to copy the
> dags to an s3 folder
>
> We then routinely sync this folder to the local airflow dags folder every
> X amount of mins
>
> Re test data
> *" but what's your strategy for test data: that's always the troublespot."*
>
> Our application is using versioning against the data, so we expect the
> source data to be in a certain version and the output data to also be in a
> certain version
>
> We have a test resources folder that we have following the same convention
> of versioning - this is the data that our application tests use - to ensure
> that the data is in the correct format
>
> so for example if we have Table X with version 1 that depends on data from
> Table A and B also version 1, we run our spark application then ensure the
> transformed table X has the correct columns and row values
>
> Then when we have a new version 2 of the source data or adding a new
> column in Table X (version 2), we generate a new version of the data and
> ensure the tests are updated
>
> That way we ensure any new version of the data has tests against it
>
> *"I've never seen any good strategy there short of "throw it at a copy of
> the production dataset"."*
>
> I agree which is why we have a sample of the production data and version
> the schemas we expect the source and target data to look like.
>
> If people are interested I am happy writing a blog about it in the hopes
> this helps people build more reliable pipelines
>
> Kind Regards
> Sam
>
>
>
>
>
>
>
>
>
>
> On Tue, Apr 11, 2017 at 11:31 AM, Steve Loughran 
> wrote:
>
>
> On 7 Apr 2017, at 18:40, Sam Elamin  wrote:
>
> Definitely agree with gourav there. I wouldn't want jenkins to run my work
> flow. Seems to me that you would only be using jenkins for its scheduling
> capabilities
>
>
> Maybe I was just looking at this differenlty
>
> Yes you can run tests but you wouldn't want it to run your orchestration
> of jobs
>
> What happens if jenkijs goes down for any particular reason. How do you
> have the conversation with your stakeholders that your pipeline is not
> working and they don't have data because the build server is going through
> an upgrade or going through an upgrade
>
>
>
> Well, I wouldn't use it as a replacement for Oozie, but I'd certainly
> consider as the pipeline for getting your code out to the cluster, so you
> don't have to explain why you just pushed out something broken
>
> As example, here's Renault's pipeline as discussed last week in Munich
> https://flic.kr/p/Tw3Emu
>
> However to be fair I understand what you are saying Steve if someone is in
> a place where you only have access to jenkins and have to go through hoops
> to setup:get access to new instances then engineers will do what they
> always do, find ways to game the system to get their work done
>
>
>
>
> This isn't about trying to "Game the system", this is about what makes a
> replicable workflow for getting code into production, either at the press
> of a button or as part of a scheduled "we push out an update every night,
> rerun the deployment tests and then switch over to the new installation"
> mech.
>
> Put differently: how do you get your code from SCM into production? Not
> just for CI, but what's your strategy for test data: that's always the
> troublespot. Random selection of rows may 

Re: Dataframes na fill with empty list

2017-04-11 Thread Sumona Routh
For some reason my pasted screenshots were removed when I sent the email
(at least that's how it appeared on my end). Repasting as text below.

The sequence you are referring to represents the list of column names to
fill. I am asking about filling a column which is of type list with an
empty list.

Here is a quick example of what I am doing:

val spark = 
SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import spark.implicits._

val list = List(IntPair(key = "a", value = 1),
IntPair(key = "a", value = 2),
IntPair(key = "b", value = 2))
val df = spark.createDataset(list).toDF
df.show
val collectList = df.groupBy($"key").agg(collect_list("value") as "listylist")
collectList.show
collectList.printSchema()
collectList.na.fill(Array(), Seq("listyList"))


The output of the show and printSchema for the collectList df:

|key|listylist|
+---+-+
|  b|  [2]|
|  a|   [1, 2]|
+---+-+

root
 |-- key: string (nullable = true)
 |-- listylist: array (nullable = true)
 ||-- element: integer (containsNull = true)


So, the last line which doesn't compile is what I would want to do (after
outer joining of course, it's not necessary except in that particular case
where a null could be populated in that field).

Thanks,
Sumona



On Tue, Apr 11, 2017 at 9:50 AM Sumona Routh <sumos...@gmail.com> wrote:

> The sequence you are referring to represents the list of column names to
> fill. I am asking about filling a column which is of type list with an
> empty list.
>
> Here is a quick example of what I am doing:
>
>
> The output of the show and printSchema for the collectList df:
>
>
>
> So, the last line which doesn't compile is what I would want to do (after
> outer joining of course, it's not necessary except in that particular case
> where a null could be populated in that field).
>
> Thanks,
> Sumona
>
> On Tue, Apr 11, 2017 at 2:02 AM Didac Gil <didacgil9...@gmail.com> wrote:
>
> It does support it, at least in 2.0.2 as I am running:
>
> Here one example:
>
> val parsedLines = stream_of_logs
>   .map(line => p.parseRecord_viaCSVParser(line))
>   .join(appsCateg,$"Application"===$"name","left_outer")
>   .drop("id")
>   .na.fill(0, Seq(“numeric_field1”,"numeric_field2"))
>   .na.fill("", Seq(
>    “text_field1","text_field2","text_field3”))
>
>
> Notice that you have to differentiate those fields that are meant to be
> filled with an int, from those that require a different value, an empty
> string in my case.
>
> On 11 Apr 2017, at 03:18, Sumona Routh <sumos...@gmail.com> wrote:
>
> Hi there,
> I have two dataframes that each have some columns which are of list type
> (array generated by the collect_list function actually).
>
> I need to outer join these two dfs, however by nature of an outer join I
> am sometimes left with null values. Normally I would use df.na.fill(...),
> however it appears the fill function doesn't support this data type.
>
> Can anyone recommend an alternative? I have also been playing around with
> coalesce in a sql expression, but I'm not having any luck here either.
>
> Obviously, I can do a null check on the fields downstream, however it is
> not in the spirit of scala to pass around nulls, so I wanted to see if I
> was missing another approach first.
>
> Thanks,
> Sumona
>
> I am using Spark 2.0.2
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
> On 11 Apr 2017, at 03:18, Sumona Routh <sumos...@gmail.com> wrote:
>
> Hi there,
> I have two dataframes that each have some columns which are of list type
> (array generated by the collect_list function actually).
>
> I need to outer join these two dfs, however by nature of an outer join I
> am sometimes left with null values. Normally I would use df.na.fill(...),
> however it appears the fill function doesn't support this data type.
>
> Can anyone recommend an alternative? I have also been playing around with
> coalesce in a sql expression, but I'm not having any luck here either.
>
> Obviously, I can do a null check on the fields downstream, however it is
> not in the spirit of scala to pass around nulls, so I wanted to see if I
> was missing another approach first.
>
> Thanks,
> Sumona
>
> I am using Spark 2.0.2
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>


Re: Dataframes na fill with empty list

2017-04-11 Thread Sumona Routh
The sequence you are referring to represents the list of column names to
fill. I am asking about filling a column which is of type list with an
empty list.

Here is a quick example of what I am doing:


The output of the show and printSchema for the collectList df:



So, the last line which doesn't compile is what I would want to do (after
outer joining of course, it's not necessary except in that particular case
where a null could be populated in that field).

Thanks,
Sumona

On Tue, Apr 11, 2017 at 2:02 AM Didac Gil <didacgil9...@gmail.com> wrote:

> It does support it, at least in 2.0.2 as I am running:
>
> Here one example:
>
> val parsedLines = stream_of_logs
>   .map(line => p.parseRecord_viaCSVParser(line))
>   .join(appsCateg,$"Application"===$"name","left_outer")
>   .drop("id")
>   .na.fill(0, Seq(“numeric_field1”,"numeric_field2"))
>   .na.fill("", Seq(
>“text_field1","text_field2","text_field3”))
>
>
> Notice that you have to differentiate those fields that are meant to be
> filled with an int, from those that require a different value, an empty
> string in my case.
>
> On 11 Apr 2017, at 03:18, Sumona Routh <sumos...@gmail.com> wrote:
>
> Hi there,
> I have two dataframes that each have some columns which are of list type
> (array generated by the collect_list function actually).
>
> I need to outer join these two dfs, however by nature of an outer join I
> am sometimes left with null values. Normally I would use df.na.fill(...),
> however it appears the fill function doesn't support this data type.
>
> Can anyone recommend an alternative? I have also been playing around with
> coalesce in a sql expression, but I'm not having any luck here either.
>
> Obviously, I can do a null check on the fields downstream, however it is
> not in the spirit of scala to pass around nulls, so I wanted to see if I
> was missing another approach first.
>
> Thanks,
> Sumona
>
> I am using Spark 2.0.2
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
> On 11 Apr 2017, at 03:18, Sumona Routh <sumos...@gmail.com> wrote:
>
> Hi there,
> I have two dataframes that each have some columns which are of list type
> (array generated by the collect_list function actually).
>
> I need to outer join these two dfs, however by nature of an outer join I
> am sometimes left with null values. Normally I would use df.na.fill(...),
> however it appears the fill function doesn't support this data type.
>
> Can anyone recommend an alternative? I have also been playing around with
> coalesce in a sql expression, but I'm not having any luck here either.
>
> Obviously, I can do a null check on the fields downstream, however it is
> not in the spirit of scala to pass around nulls, so I wanted to see if I
> was missing another approach first.
>
> Thanks,
> Sumona
>
> I am using Spark 2.0.2
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>


Dataframes na fill with empty list

2017-04-10 Thread Sumona Routh
Hi there,
I have two dataframes that each have some columns which are of list type
(array generated by the collect_list function actually).

I need to outer join these two dfs, however by nature of an outer join I am
sometimes left with null values. Normally I would use df.na.fill(...),
however it appears the fill function doesn't support this data type.

Can anyone recommend an alternative? I have also been playing around with
coalesce in a sql expression, but I'm not having any luck here either.

Obviously, I can do a null check on the fields downstream, however it is
not in the spirit of scala to pass around nulls, so I wanted to see if I
was missing another approach first.

Thanks,
Sumona

I am using Spark 2.0.2


Re: Can't load a RandomForestClassificationModel in Spark job

2017-01-12 Thread Sumona Routh
Yes, I save it to S3 in a different process. It is actually the
RandomForestClassificationModel.load method (passed an s3 path) where I run
into problems.
When you say you load it during map stages, do you mean that you are able
to directly load a model from inside of a transformation? When I try this,
it passes the function to a worker, and the load method itself appears to
attempt to create a new SparkContext, which causes an NPE downstream
(because creating a SparkContext on the worker is not an appropriate thing
to do, according to various threads I've found).

Maybe there is a different load function I should be using?

Thanks!
Sumona

On Thu, Jan 12, 2017 at 6:26 PM ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> Given training and predictions are two different applications, I typically
> save model objects to hdfs and load it back during prediction map stages.
>
> Best
> Ayan
>
> On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh <sumos...@gmail.com> wrote:
>
> Hi all,
> I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
>
> I encountered two frustrating issues and would really appreciate some
> advice:
>
> 1)  RandomForestClassificationModel is effectively not serializable (I
> assume it's referencing something that can't be serialized, since it itself
> extends serializable), so I ended up with the well-known exception:
> org.apache.spark.SparkException: Task not serializable.
> Basically, my original intention was to pass the model as a parameter
>
> because which model we use is dynamic based on what record we are
>
> predicting on.
>
> Has anyone else encountered this? Is this currently being addressed? I
> would expect objects from Spark's own libraries be able to be used
> seamlessly in their applications without these types of exceptions.
>
> 2) The RandomForestClassificationModel.load method appears to hang
> indefinitely when executed from inside a map function (which I assume is
> passed to the executor). So, I basically cannot load a model from a worker.
> We have multiple "profiles" that use differently trained models, which are
> accessed from within a map function to run predictions on different sets of
> data.
> The thread that is hanging has this as the latest (most pertinent) code:
>
> org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
> Looking at the code in github, it appears that it is calling sc.textFile.
> I could not find anything stating that this particular function would not
> work from within a map function.
>
> Are there any suggestions as to how I can get this model to work on a real
> production job (either by allowing it to be serializable and passed around
> or loaded from a worker)?
>
> I've extenisvely POCed this model (saving, loading, transforming,
> training, etc.), however this is the first time I'm attempting to use it
> from within a real application.
>
> Sumona
>
>


Can't load a RandomForestClassificationModel in Spark job

2017-01-12 Thread Sumona Routh
Hi all,
I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.

I encountered two frustrating issues and would really appreciate some
advice:

1)  RandomForestClassificationModel is effectively not serializable (I
assume it's referencing something that can't be serialized, since it itself
extends serializable), so I ended up with the well-known exception:
org.apache.spark.SparkException: Task not serializable.
Basically, my original intention was to pass the model as a parameter
because which model we use is dynamic based on what record we are
predicting on.

Has anyone else encountered this? Is this currently being addressed? I
would expect objects from Spark's own libraries be able to be used
seamlessly in their applications without these types of exceptions.

2) The RandomForestClassificationModel.load method appears to hang
indefinitely when executed from inside a map function (which I assume is
passed to the executor). So, I basically cannot load a model from a worker.
We have multiple "profiles" that use differently trained models, which are
accessed from within a map function to run predictions on different sets of
data.
The thread that is hanging has this as the latest (most pertinent) code:
org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
Looking at the code in github, it appears that it is calling sc.textFile. I
could not find anything stating that this particular function would not
work from within a map function.

Are there any suggestions as to how I can get this model to work on a real
production job (either by allowing it to be serializable and passed around
or loaded from a worker)?

I've extenisvely POCed this model (saving, loading, transforming, training,
etc.), however this is the first time I'm attempting to use it from within
a real application.

Sumona


Re: Upgrade from 1.2 to 1.6 - parsing flat files in working directory

2016-07-26 Thread Sumona Routh
Can anyone provide some guidance on how to get files on the classpath for
our Spark job? This used to work in 1.2, however after upgrading we are
getting nulls when attempting to load resources.

Thanks,
Sumona

On Thu, Jul 21, 2016 at 4:43 PM Sumona Routh <sumos...@gmail.com> wrote:

> Hi all,
> We are running into a classpath issue when we upgrade our application from
> 1.2 to 1.6.
>
> In 1.2, we load properties from a flat file (from working directory of the
> spark-submit script) using classloader resource approach. This was executed
> up front (by the driver) before any processing happened.
>
>  val confStream =
> Thread.currentThread().getContextClassLoader().getResourceAsStream(appConfigPath)
>
> confProperties.load(confStream)
>
> In 1.6, the line getResourceAsStream returns a null and thus causes a
> subsequent NullPointerException when loading the properties.
>
> How do we pass flat files (there are many, so we really want to add a
> directory to the classpath) in Spark 1.6? We haven't had much luck with
> --files and --driver-class-path and spark.driver.extraClasspath. We also
> couldn't find much documentation on this.
>
> Thanks!
> Sumona
>


Upgrade from 1.2 to 1.6 - parsing flat files in working directory

2016-07-21 Thread Sumona Routh
Hi all,
We are running into a classpath issue when we upgrade our application from
1.2 to 1.6.

In 1.2, we load properties from a flat file (from working directory of the
spark-submit script) using classloader resource approach. This was executed
up front (by the driver) before any processing happened.

 val confStream =
Thread.currentThread().getContextClassLoader().getResourceAsStream(appConfigPath)

confProperties.load(confStream)

In 1.6, the line getResourceAsStream returns a null and thus causes a
subsequent NullPointerException when loading the properties.

How do we pass flat files (there are many, so we really want to add a
directory to the classpath) in Spark 1.6? We haven't had much luck with
--files and --driver-class-path and spark.driver.extraClasspath. We also
couldn't find much documentation on this.

Thanks!
Sumona


Spark UI shows finished when job had an error

2016-06-17 Thread Sumona Routh
Hi there,
Our Spark job had an error (specifically the Cassandra table definition did
not match what was in Cassandra), which threw an exception that logged out
to our spark-submit log.
However ,the UI never showed any failed stage or job. It appeared as if the
job finished without error, which is not correct.

We are trying to define our monitoring for our scheduled jobs, and we
intended to use the Spark UI to catch issues. Can we explain why the UI
would not report an exception like this? Is there a better approach we
should use for tracking failures in a Spark job?

We are currently on 1.2 standalone, however we do intend to upgrade to 1.6
shortly.

Thanks!
Sumona


Re: Spark UI standalone "crashes" after an application finishes

2016-03-01 Thread Sumona Routh
Thanks Shixiong!
To clarify for others, yes, I was speaking of the UI at port 4040, and I do
have event logging enabled, so I can review jobs after the fact. We hope to
upgrade our version of Spark soon, so I'll write back if that resolves it.

Sumona

On Mon, Feb 29, 2016 at 8:27 PM Sea <261810...@qq.com> wrote:

> Hi, Sumona:
>   It's a bug in Spark old version, In spark 1.6.0, it is fixed.
>   After the application complete, spark master will load event log to
> memory, and it is sync because of actor. If the event log is big, spark
> master will hang a long time, and you can not submit any applications, if
> your master memory is to small, you master will die!
>   The solution in spark 1.6 is not very good, the operation is async
> <https://www.baidu.com/link?url=x_WhMZLHfNnhHGknDAZ8Ssl9f7YlEQAvUgpLAGz6cI045umWecBzzh0ho-QkCr2nKnHOPJxIX5_n_zXe51k8z9hVuw4svP6dqWF0JrjabAa==be50a4160f49000256d50b7b>,
> and so you still need to set a big java heap for master.
>
>
>
> -- 原始邮件 --
> *发件人:* "Shixiong(Ryan) Zhu";<shixi...@databricks.com>;
> *发送时间:* 2016年3月1日(星期二) 上午8:02
> *收件人:* "Sumona Routh"<sumos...@gmail.com>;
> *抄送:* "user@spark.apache.org"<user@spark.apache.org>;
> *主题:* Re: Spark UI standalone "crashes" after an application finishes
>
> Do you mean you cannot access Master UI after your application completes?
> Could you check the master log?
>
> On Mon, Feb 29, 2016 at 3:48 PM, Sumona Routh <sumos...@gmail.com> wrote:
>
>> Hi there,
>> I've been doing some performance tuning of our Spark application, which
>> is using Spark 1.2.1 standalone. I have been using the spark metrics to
>> graph out details as I run the jobs, as well as the UI to review the tasks
>> and stages.
>>
>> I notice that after my application completes, or is near completion, the
>> UI "crashes." I get a Connection Refused response. Sometimes, the page
>> eventually recovers and will load again, but sometimes I end up having to
>> restart the Spark master to get it back. When I look at my graphs on the
>> app, the memory consumption (of driver, executors, and what I believe to be
>> the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
>> master machine itself, memory and CPU appear healthy as well.
>>
>> Has anyone else seen this issue? Are there logs for the UI itself, and
>> where might I find those?
>>
>> Thanks!
>> Sumona
>>
>
>


Spark UI standalone "crashes" after an application finishes

2016-02-29 Thread Sumona Routh
Hi there,
I've been doing some performance tuning of our Spark application, which is
using Spark 1.2.1 standalone. I have been using the spark metrics to graph
out details as I run the jobs, as well as the UI to review the tasks and
stages.

I notice that after my application completes, or is near completion, the UI
"crashes." I get a Connection Refused response. Sometimes, the page
eventually recovers and will load again, but sometimes I end up having to
restart the Spark master to get it back. When I look at my graphs on the
app, the memory consumption (of driver, executors, and what I believe to be
the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
master machine itself, memory and CPU appear healthy as well.

Has anyone else seen this issue? Are there logs for the UI itself, and
where might I find those?

Thanks!
Sumona


Re: SparkListener onApplicationEnd processing an RDD throws exception because of stopped SparkContext

2016-02-22 Thread Sumona Routh
Ok, I understand.

Yes, I will have to handle them in the main thread.

Thanks!
Sumona



On Wed, Feb 17, 2016 at 12:24 PM Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> `onApplicationEnd` is posted when SparkContext is stopping, and you cannot
> submit any job to a stopping SparkContext. In general, SparkListener is
> used to monitor the job progress and collect job information, an you should
> not submit jobs there. Why not submit your jobs in the main thread?
>
> On Wed, Feb 17, 2016 at 7:11 AM, Sumona Routh <sumos...@gmail.com> wrote:
>
>> Can anyone provide some insight into the flow of SparkListeners,
>> specifically onApplicationEnd? I'm having issues with the SparkContext
>> being stopped before my final processing can complete.
>>
>> Thanks!
>> Sumona
>>
>> On Mon, Feb 15, 2016 at 8:59 AM Sumona Routh <sumos...@gmail.com> wrote:
>>
>>> Hi there,
>>> I am trying to implement a listener that performs as a post-processor
>>> which stores data about what was processed or erred. With this, I use an
>>> RDD that may or may not change during the course of the application.
>>>
>>> My thought was to use onApplicationEnd and then saveToCassandra call to
>>> persist this.
>>>
>>> From what I've gathered in my experiments,  onApplicationEnd  doesn't
>>> get called until sparkContext.stop() is called. If I don't call stop in my
>>> code, the listener won't be called. This works fine on my local tests -
>>> stop gets called, the listener is called and then persisted to the db, and
>>> everything works fine. However when I run this on our server,  the code in
>>> onApplicationEnd throws the following exception:
>>>
>>> Task serialization failed: java.lang.IllegalStateException: Cannot call
>>> methods on a stopped SparkContext
>>>
>>> What's the best way to resolve this? I can think of creating a new
>>> SparkContext in the listener (I think I have to turn on allowing multiple
>>> contexts, in case I try to create one before the other one is stopped). It
>>> seems odd but might be doable. Additionally, what if I were to simply add
>>> the code into my job in some sort of procedural block: doJob,
>>> doPostProcessing, does that guarantee postProcessing will occur after the
>>> other?
>>>
>>> We are currently using Spark 1.2 standalone at the moment.
>>>
>>> Please let me know if you require more details. Thanks for the
>>> assistance!
>>> Sumona
>>>
>>>
>


Re: SparkListener onApplicationEnd processing an RDD throws exception because of stopped SparkContext

2016-02-17 Thread Sumona Routh
Can anyone provide some insight into the flow of SparkListeners,
specifically onApplicationEnd? I'm having issues with the SparkContext
being stopped before my final processing can complete.

Thanks!
Sumona

On Mon, Feb 15, 2016 at 8:59 AM Sumona Routh <sumos...@gmail.com> wrote:

> Hi there,
> I am trying to implement a listener that performs as a post-processor
> which stores data about what was processed or erred. With this, I use an
> RDD that may or may not change during the course of the application.
>
> My thought was to use onApplicationEnd and then saveToCassandra call to
> persist this.
>
> From what I've gathered in my experiments,  onApplicationEnd  doesn't get
> called until sparkContext.stop() is called. If I don't call stop in my
> code, the listener won't be called. This works fine on my local tests -
> stop gets called, the listener is called and then persisted to the db, and
> everything works fine. However when I run this on our server,  the code in
> onApplicationEnd throws the following exception:
>
> Task serialization failed: java.lang.IllegalStateException: Cannot call
> methods on a stopped SparkContext
>
> What's the best way to resolve this? I can think of creating a new
> SparkContext in the listener (I think I have to turn on allowing multiple
> contexts, in case I try to create one before the other one is stopped). It
> seems odd but might be doable. Additionally, what if I were to simply add
> the code into my job in some sort of procedural block: doJob,
> doPostProcessing, does that guarantee postProcessing will occur after the
> other?
>
> We are currently using Spark 1.2 standalone at the moment.
>
> Please let me know if you require more details. Thanks for the assistance!
> Sumona
>
>


SparkListener onApplicationEnd processing an RDD throws exception because of stopped SparkContext

2016-02-15 Thread Sumona Routh
Hi there,
I am trying to implement a listener that performs as a post-processor which
stores data about what was processed or erred. With this, I use an RDD that
may or may not change during the course of the application.

My thought was to use onApplicationEnd and then saveToCassandra call to
persist this.

>From what I've gathered in my experiments,  onApplicationEnd  doesn't get
called until sparkContext.stop() is called. If I don't call stop in my
code, the listener won't be called. This works fine on my local tests -
stop gets called, the listener is called and then persisted to the db, and
everything works fine. However when I run this on our server,  the code in
onApplicationEnd throws the following exception:

Task serialization failed: java.lang.IllegalStateException: Cannot call
methods on a stopped SparkContext

What's the best way to resolve this? I can think of creating a new
SparkContext in the listener (I think I have to turn on allowing multiple
contexts, in case I try to create one before the other one is stopped). It
seems odd but might be doable. Additionally, what if I were to simply add
the code into my job in some sort of procedural block: doJob,
doPostProcessing, does that guarantee postProcessing will occur after the
other?

We are currently using Spark 1.2 standalone at the moment.

Please let me know if you require more details. Thanks for the assistance!
Sumona


SparkListener - why is org.apache.spark.scheduler.JobFailed in scala private?

2016-02-10 Thread Sumona Routh
Hi there,
I am trying to create a listener for my Spark job to do some additional
notifications for failures using this Scala API:
https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.scheduler.JobResult
.

My idea was to write something like this:

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
jobEnd.jobResult match {
  case JobFailed(exception) => //do stuff here
}
 }

However, JobFailed class is package private, and thus I cannot do this. It's
sibling class, JobSucceeded is public, but obviously I want to handle failed
scenarios and be able to introspect the exception.
I did notice that the corresponding class in the Java API is public.

Is there another pattern I should follow to handle failures?

Thanks!