Re: Two Nodes :SparkContext Null Pointer

2017-04-10 Thread Sriram
Fixed it by submitting the second job as a child process. 

Thanks,
Sriram.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Two-Nodes-SparkContext-Null-Pointer-tp28582p28585.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Dataframes na fill with empty list

2017-04-10 Thread Sumona Routh
Hi there,
I have two dataframes that each have some columns which are of list type
(array generated by the collect_list function actually).

I need to outer join these two dfs, however by nature of an outer join I am
sometimes left with null values. Normally I would use df.na.fill(...),
however it appears the fill function doesn't support this data type.

Can anyone recommend an alternative? I have also been playing around with
coalesce in a sql expression, but I'm not having any luck here either.

Obviously, I can do a null check on the fields downstream, however it is
not in the spirit of scala to pass around nulls, so I wanted to see if I
was missing another approach first.

Thanks,
Sumona

I am using Spark 2.0.2


Re: Is checkpointing in Spark Streaming Synchronous or Asynchronous ?

2017-04-10 Thread Tathagata Das
As of now (Spark 2.2), Structured Streaming does checkpoint of the state
data synchronously in every trigger. But the checkpointing is incremental,
so it wont be writing all your state every time. And we will be making this
asynchronous soon.

On Fri, Apr 7, 2017 at 3:19 AM, kant kodali  wrote:

> Hi All,
>
> Is checkpointing in Spark Streaming Synchronous or Asynchronous ? other
> words can spark continue processing the stream while checkpointing?
>
> Thanks!
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread kant kodali
Perfect! Thanks a lot.

On Mon, Apr 10, 2017 at 1:39 PM, Tathagata Das 
wrote:

> The trigger interval is optionally specified in the writeStream option
> before start.
>
> val windowedCounts = words.groupBy(
>   window($"timestamp", "24 hours", "24 hours"),
>   $"word"
> ).count()
> .writeStream
> .trigger(ProcessingTime("10 seconds"))  // optional
> .format("memory")
> .queryName("tableName")
> .start()
>
> See the full example here - https://github.com/apache/
> spark/blob/master/examples/src/main/scala/org/apache/
> spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
>
>
> On Mon, Apr 10, 2017 at 12:55 PM, kant kodali  wrote:
>
>> Thanks again! Looks like the update mode is not available in 2.1 (which
>> seems to be the latest version as of today) and I am assuming there will be
>> a way to specify trigger interval with the next release because with the
>> following code I don't see a way to specify trigger interval.
>>
>> val windowedCounts = words.groupBy(
>>   window($"timestamp", "24 hours", "24 hours"),
>>   $"word").count()
>>
>>
>> On Mon, Apr 10, 2017 at 12:32 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> It sounds like you want a tumbling window (where the slide and duration
>>> are the same).  This is the default if you give only one interval.  You
>>> should set the output mode to "update" (i.e. output only the rows that have
>>> been updated since the last trigger) and the trigger to "1 second".
>>>
>>> Try thinking about the batch query that would produce the answer you
>>> want.  Structured streaming will figure out an efficient way to compute
>>> that answer incrementally as new data arrives.
>>>
>>> On Mon, Apr 10, 2017 at 12:20 PM, kant kodali 
>>> wrote:
>>>
 Hi Michael,

 Thanks for the response. I guess I was thinking more in terms of the
 regular streaming model. so In this case I am little confused what my
 window interval and slide interval be for the following case?

 I need to hold a state (say a count) for 24 hours while capturing all
 its updates and produce results every second. I also need to reset the
 state (the count) back to zero every 24 hours.






 On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> Nope, structured streaming eliminates the limitation that
> micro-batching should affect the results of your streaming query.  Trigger
> is just an indication of how often you want to produce results (and if you
> leave it blank we just run as quickly as possible).
>
> To control how tuples are grouped into a window, take a look at the
> window
> 
> function.
>
> On Thu, Apr 6, 2017 at 10:26 AM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> Is the trigger interval mentioned in this doc
>> 
>> the same as batch interval in structured streaming? For example I have a
>> long running receiver(not kafka) which sends me a real time stream I want
>> to use window interval, slide interval of 24 hours to create the Tumbling
>> window effect but I want to process updates every second.
>>
>> Thanks!
>>
>
>

>>>
>>
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Tathagata Das
The trigger interval is optionally specified in the writeStream option
before start.

val windowedCounts = words.groupBy(
  window($"timestamp", "24 hours", "24 hours"),
  $"word"
).count()
.writeStream
.trigger(ProcessingTime("10 seconds"))  // optional
.format("memory")
.queryName("tableName")
.start()

See the full example here -
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala


On Mon, Apr 10, 2017 at 12:55 PM, kant kodali  wrote:

> Thanks again! Looks like the update mode is not available in 2.1 (which
> seems to be the latest version as of today) and I am assuming there will be
> a way to specify trigger interval with the next release because with the
> following code I don't see a way to specify trigger interval.
>
> val windowedCounts = words.groupBy(
>   window($"timestamp", "24 hours", "24 hours"),
>   $"word").count()
>
>
> On Mon, Apr 10, 2017 at 12:32 PM, Michael Armbrust  > wrote:
>
>> It sounds like you want a tumbling window (where the slide and duration
>> are the same).  This is the default if you give only one interval.  You
>> should set the output mode to "update" (i.e. output only the rows that have
>> been updated since the last trigger) and the trigger to "1 second".
>>
>> Try thinking about the batch query that would produce the answer you
>> want.  Structured streaming will figure out an efficient way to compute
>> that answer incrementally as new data arrives.
>>
>> On Mon, Apr 10, 2017 at 12:20 PM, kant kodali  wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks for the response. I guess I was thinking more in terms of the
>>> regular streaming model. so In this case I am little confused what my
>>> window interval and slide interval be for the following case?
>>>
>>> I need to hold a state (say a count) for 24 hours while capturing all
>>> its updates and produce results every second. I also need to reset the
>>> state (the count) back to zero every 24 hours.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Nope, structured streaming eliminates the limitation that
 micro-batching should affect the results of your streaming query.  Trigger
 is just an indication of how often you want to produce results (and if you
 leave it blank we just run as quickly as possible).

 To control how tuples are grouped into a window, take a look at the
 window
 
 function.

 On Thu, Apr 6, 2017 at 10:26 AM, kant kodali 
 wrote:

> Hi All,
>
> Is the trigger interval mentioned in this doc
> 
> the same as batch interval in structured streaming? For example I have a
> long running receiver(not kafka) which sends me a real time stream I want
> to use window interval, slide interval of 24 hours to create the Tumbling
> window effect but I want to process updates every second.
>
> Thanks!
>


>>>
>>
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread kant kodali
Thanks again! Looks like the update mode is not available in 2.1 (which
seems to be the latest version as of today) and I am assuming there will be
a way to specify trigger interval with the next release because with the
following code I don't see a way to specify trigger interval.

val windowedCounts = words.groupBy(
  window($"timestamp", "24 hours", "24 hours"),
  $"word").count()


On Mon, Apr 10, 2017 at 12:32 PM, Michael Armbrust 
wrote:

> It sounds like you want a tumbling window (where the slide and duration
> are the same).  This is the default if you give only one interval.  You
> should set the output mode to "update" (i.e. output only the rows that have
> been updated since the last trigger) and the trigger to "1 second".
>
> Try thinking about the batch query that would produce the answer you
> want.  Structured streaming will figure out an efficient way to compute
> that answer incrementally as new data arrives.
>
> On Mon, Apr 10, 2017 at 12:20 PM, kant kodali  wrote:
>
>> Hi Michael,
>>
>> Thanks for the response. I guess I was thinking more in terms of the
>> regular streaming model. so In this case I am little confused what my
>> window interval and slide interval be for the following case?
>>
>> I need to hold a state (say a count) for 24 hours while capturing all its
>> updates and produce results every second. I also need to reset the state
>> (the count) back to zero every 24 hours.
>>
>>
>>
>>
>>
>>
>> On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Nope, structured streaming eliminates the limitation that micro-batching
>>> should affect the results of your streaming query.  Trigger is just an
>>> indication of how often you want to produce results (and if you leave it
>>> blank we just run as quickly as possible).
>>>
>>> To control how tuples are grouped into a window, take a look at the
>>> window
>>> 
>>> function.
>>>
>>> On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:
>>>
 Hi All,

 Is the trigger interval mentioned in this doc
 
 the same as batch interval in structured streaming? For example I have a
 long running receiver(not kafka) which sends me a real time stream I want
 to use window interval, slide interval of 24 hours to create the Tumbling
 window effect but I want to process updates every second.

 Thanks!

>>>
>>>
>>
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
It sounds like you want a tumbling window (where the slide and duration are
the same).  This is the default if you give only one interval.  You should
set the output mode to "update" (i.e. output only the rows that have been
updated since the last trigger) and the trigger to "1 second".

Try thinking about the batch query that would produce the answer you want.
Structured streaming will figure out an efficient way to compute that
answer incrementally as new data arrives.

On Mon, Apr 10, 2017 at 12:20 PM, kant kodali  wrote:

> Hi Michael,
>
> Thanks for the response. I guess I was thinking more in terms of the
> regular streaming model. so In this case I am little confused what my
> window interval and slide interval be for the following case?
>
> I need to hold a state (say a count) for 24 hours while capturing all its
> updates and produce results every second. I also need to reset the state
> (the count) back to zero every 24 hours.
>
>
>
>
>
>
> On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust  > wrote:
>
>> Nope, structured streaming eliminates the limitation that micro-batching
>> should affect the results of your streaming query.  Trigger is just an
>> indication of how often you want to produce results (and if you leave it
>> blank we just run as quickly as possible).
>>
>> To control how tuples are grouped into a window, take a look at the
>> window
>> 
>> function.
>>
>> On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> Is the trigger interval mentioned in this doc
>>> 
>>> the same as batch interval in structured streaming? For example I have a
>>> long running receiver(not kafka) which sends me a real time stream I want
>>> to use window interval, slide interval of 24 hours to create the Tumbling
>>> window effect but I want to process updates every second.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread kant kodali
Hi Michael,

Thanks for the response. I guess I was thinking more in terms of the
regular streaming model. so In this case I am little confused what my
window interval and slide interval be for the following case?

I need to hold a state (say a count) for 24 hours while capturing all its
updates and produce results every second. I also need to reset the state
(the count) back to zero every 24 hours.






On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust 
wrote:

> Nope, structured streaming eliminates the limitation that micro-batching
> should affect the results of your streaming query.  Trigger is just an
> indication of how often you want to produce results (and if you leave it
> blank we just run as quickly as possible).
>
> To control how tuples are grouped into a window, take a look at the window
> 
> function.
>
> On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> Is the trigger interval mentioned in this doc
>> 
>> the same as batch interval in structured streaming? For example I have a
>> long running receiver(not kafka) which sends me a real time stream I want
>> to use window interval, slide interval of 24 hours to create the Tumbling
>> window effect but I want to process updates every second.
>>
>> Thanks!
>>
>
>


Re: Cant convert Dataset to case class with Option fields

2017-04-10 Thread Michael Armbrust
Options should work.  Can you give a full example that is freezing?  Which
version of Spark are you using?

On Fri, Apr 7, 2017 at 6:59 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Devs,
> I've some case classes here, and it's fields are all optional
> case class A(b:Option[B] = None, c: Option[C] = None, ...)
>
> If I read some data in a DataSet and try to connvert it to this case class
> using the as method, it doesn't give me any answer, it simple freeze.
> If I change the case class to
>
> case class A(b:B,c:C)
> id work nice and return the field values as null.
>
> Option fields aren't supported by the as method or is this an Issue?
>
> Kind Regards,
> Dirceu
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
Nope, structured streaming eliminates the limitation that micro-batching
should affect the results of your streaming query.  Trigger is just an
indication of how often you want to produce results (and if you leave it
blank we just run as quickly as possible).

To control how tuples are grouped into a window, take a look at the window

function.

On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:

> Hi All,
>
> Is the trigger interval mentioned in this doc
> 
> the same as batch interval in structured streaming? For example I have a
> long running receiver(not kafka) which sends me a real time stream I want
> to use window interval, slide interval of 24 hours to create the Tumbling
> window effect but I want to process updates every second.
>
> Thanks!
>


Re: pandas DF Dstream to Spark DF

2017-04-10 Thread Bryan Cutler
Hi Yogesh,

It would be easier to help if you included your code and the exact error
messages that occur.  If you are creating a Spark DataFrame with a Pandas
DataFrame, then Spark does not read the schema and infers from the data to
make one.  This might be the cause of your issue if the schema is not
inferred correctly.  You can try to specify the schema manually, like this
for example

schema = StructType([
StructField("str_t", StringType(), True),
StructField("int_t", IntegerType(), True),
StructField("double_t", DoubleType(), True)])

pandas_df = pandas.DataFrame(data={...})
spark_df = spark.createDataFrame(pandas_df, schema=schema)

This step might be eliminated by using Apache Arrow, see SPARK-13534 for
related work.

On Sun, Apr 9, 2017 at 10:19 PM, Yogesh Vyas  wrote:

> Hi,
>
> I am writing a pyspark streaming job in which i am returning a pandas data
> frame as DStream. Now I wanted to save this DStream dataframe to parquet
> file. How to do that?
>
> I am trying to convert it to spark data frame but I am getting multiple
> errors. Please suggest me how to do that.
>
> Regards,
> Yogesh
>


Re: Assigning a unique row ID

2017-04-10 Thread Everett Anderson
Indeed, I tried persist with MEMORY_AND_DISK and it works! (I'm wary of
MEMORY_ONLY for this as it could potentially recompute shards if it
couldn't entirely cache in memory.)

Thanks for the help, everybody!!

On Sat, Apr 8, 2017 at 11:54 AM, Everett Anderson  wrote:

>
>
> On Fri, Apr 7, 2017 at 8:04 PM, Subhash Sriram 
> wrote:
>
>> Hi,
>>
>> We use monotonically_increasing_id() as well, but just cache the table
>> first like Ankur suggested. With that method, we get the same keys in all
>> derived tables.
>>
>
> Ah, okay, awesome. Let me give that a go.
>
>
>
>>
>> Thanks,
>> Subhash
>>
>> Sent from my iPhone
>>
>> On Apr 7, 2017, at 7:32 PM, Everett Anderson 
>> wrote:
>>
>> Hi,
>>
>> Thanks, but that's using a random UUID. Certainly unlikely to have
>> collisions, but not guaranteed.
>>
>> I'd rather prefer something like monotonically_increasing_id or RDD's
>> zipWithUniqueId but with better behavioral characteristics -- so they don't
>> surprise people when 2+ outputs derived from an original table end up not
>> having the same IDs for the same rows, anymore.
>>
>> It seems like this would be possible under the covers, but would have the
>> performance penalty of needing to do perhaps a count() and then also a
>> checkpoint.
>>
>> I was hoping there's a better way.
>>
>>
>> On Fri, Apr 7, 2017 at 4:24 PM, Tim Smith  wrote:
>>
>>> http://stackoverflow.com/questions/37231616/add-a-new-column
>>> -to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator
>>>
>>>
>>> On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson <
>>> ever...@nuna.com.invalid> wrote:
>>>
 Hi,

 What's the best way to assign a truly unique row ID (rather than a
 hash) to a DataFrame/Dataset?

 I originally thought that functions.monotonically_increasing_id would
 do this, but it seems to have a rather unfortunate property that if you add
 it as a column to table A and then derive tables X, Y, Z and save those,
 the row ID values in X, Y, and Z may end up different. I assume this is
 because it delays the actual computation to the point where each of those
 tables is computed.


>>>
>>>
>>> --
>>>
>>> --
>>> Thanks,
>>>
>>> Tim
>>>
>>
>>
>


Re: unit testing in spark

2017-04-10 Thread Jörn Franke

I think in the end you need to check the coverage of your application. If your 
application is well covered on the job or pipeline level (depends however on 
how you implement these tests) then it can be fine.
In the end it really depends on the data and what kind of transformation you 
implement. For example, you have 90% of your job with standard transformations, 
but 10% are more or less complex customized functions, then it might be worth 
to test the function with many different data inputs as unit tests and have 
integrated job/pipeline tests in addition.

> On 10. Apr 2017, at 15:46, Gokula Krishnan D  wrote:
> 
> Hello Shiv, 
> 
> Unit Testing is really helping when you follow TDD approach. And it's a safe 
> way to code a program locally and also you can make use those test cases 
> during the build process by using any of the continuous integration tools ( 
> Bamboo, Jenkins). If so you can ensure that artifacts are being tested before 
> deploying into Cluster.
> 
> 
> Thanks & Regards, 
> Gokula Krishnan (Gokul)
> 
>> On Wed, Apr 5, 2017 at 7:32 AM, Shiva Ramagopal  wrote:
>> Hi,
>> 
>> I've been following this thread for a while. 
>> 
>> I'm trying to bring in a test strategy in my team to test a number of data 
>> pipelines before production. I have watched Lars' presentation and find it 
>> great. However I'm debating whether unit tests are worth the effort if there 
>> are good job-level and pipeline-level tests. Does anybody have any 
>> experiences benefitting from unit-tests in such a case?
>> 
>> Cheers,
>> Shiv
>> 
>>> On Mon, Dec 12, 2016 at 6:00 AM, Juan Rodríguez Hortalá 
>>>  wrote:
>>> Hi all, 
>>> 
>>> I would also would like to participate on that. 
>>> 
>>> Greetings, 
>>> 
>>> Juan 
>>> 
 On Fri, Dec 9, 2016 at 6:03 AM, Michael Stratton 
  wrote:
 That sounds great, please include me so I can get involved.
 
> On Fri, Dec 9, 2016 at 7:39 AM, Marco Mistroni  
> wrote:
> Me too as I spent most of my time writing unit/integ tests  pls 
> advise on where I  can start
> Kr
> 
>> On 9 Dec 2016 12:15 am, "Miguel Morales"  wrote:
>> I would be interested in contributing.  Ive created my own library for 
>> this as well.  In my blog post I talk about testing with Spark in RSpec 
>> style: 
>> https://medium.com/@therevoltingx/test-driven-development-w-apache-spark-746082b44941
>> 
>> Sent from my iPhone
>> 
>>> On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:
>>> 
>>> There are also libraries designed to simplify testing Spark in the 
>>> various platforms, spark-testing-base for Scala/Java/Python (& video 
>>> https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck (scala focused 
>>> property based), pyspark.test (python focused with py.test instead of 
>>> unittest2) (& blog post from nextdoor 
>>> https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b#.jw3bdcej9
>>>  )
>>> 
>>> Good luck on your Spark Adventures :)
>>> 
>>> P.S.
>>> 
>>> If anyone is interested in helping improve spark testing libraries I'm 
>>> always looking for more people to be involved with spark-testing-base 
>>> because I'm lazy :p
>>> 
 On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson  
 wrote:
 I wrote some advice in a previous post on the list:
 http://markmail.org/message/bbs5acrnksjxsrrs
 
 It does not mention python, but the strategy advice is the same. Just
 replace JUnit/Scalatest with pytest, unittest, or your favourite
 python test framework.
 
 
 I recently held a presentation on the subject. There is a video
 recording at https://vimeo.com/192429554 and slides at
 http://www.slideshare.net/lallea/test-strategies-for-data-processing-pipelines-67244458
 
 You can find more material on test strategies at
 http://www.mapflat.com/lands/resources/reading-list/index.html
 
 
 
 
 Lars Albertsson
 Data engineering consultant
 www.mapflat.com
 https://twitter.com/lalleal
 +46 70 7687109
 Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
 
 
 On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp  
 wrote:
 > somone can tell me how i can make unit test on pyspark ?
 > (book, tutorial ...)
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 
>>> 
>>> 
>>> 
>>> -- 
>>> Cell : 425-233-8271
>>> Twitter: 

Re: unit testing in spark

2017-04-10 Thread Gokula Krishnan D
Hello Shiv,

Unit Testing is really helping when you follow TDD approach. And it's a
safe way to code a program locally and also you can make use those test
cases during the build process by using any of the continuous integration
tools ( Bamboo, Jenkins). If so you can ensure that artifacts are being
tested before deploying into Cluster.


Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Apr 5, 2017 at 7:32 AM, Shiva Ramagopal  wrote:

> Hi,
>
> I've been following this thread for a while.
>
> I'm trying to bring in a test strategy in my team to test a number of data
> pipelines before production. I have watched Lars' presentation and find it
> great. However I'm debating whether unit tests are worth the effort if
> there are good job-level and pipeline-level tests. Does anybody have any
> experiences benefitting from unit-tests in such a case?
>
> Cheers,
> Shiv
>
> On Mon, Dec 12, 2016 at 6:00 AM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi all,
>>
>> I would also would like to participate on that.
>>
>> Greetings,
>>
>> Juan
>>
>> On Fri, Dec 9, 2016 at 6:03 AM, Michael Stratton <
>> michael.strat...@komodohealth.com> wrote:
>>
>>> That sounds great, please include me so I can get involved.
>>>
>>> On Fri, Dec 9, 2016 at 7:39 AM, Marco Mistroni 
>>> wrote:
>>>
 Me too as I spent most of my time writing unit/integ tests  pls
 advise on where I  can start
 Kr

 On 9 Dec 2016 12:15 am, "Miguel Morales" 
 wrote:

> I would be interested in contributing.  Ive created my own library for
> this as well.  In my blog post I talk about testing with Spark in RSpec
> style:
> https://medium.com/@therevoltingx/test-driven-development-w-
> apache-spark-746082b44941
>
> Sent from my iPhone
>
> On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:
>
> There are also libraries designed to simplify testing Spark in the
> various platforms, spark-testing-base
>  for Scala/Java/Python
> (& video https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck
>  (scala focused property based),
> pyspark.test (python focused with py.test instead of unittest2) (&
> blog post from nextdoor https://engblog.nextd
> oor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b#.jw3bdcej9
>  )
>
> Good luck on your Spark Adventures :)
>
> P.S.
>
> If anyone is interested in helping improve spark testing libraries I'm
> always looking for more people to be involved with spark-testing-base
> because I'm lazy :p
>
> On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson 
> wrote:
>
>> I wrote some advice in a previous post on the list:
>> http://markmail.org/message/bbs5acrnksjxsrrs
>>
>> It does not mention python, but the strategy advice is the same. Just
>> replace JUnit/Scalatest with pytest, unittest, or your favourite
>> python test framework.
>>
>>
>> I recently held a presentation on the subject. There is a video
>> recording at https://vimeo.com/192429554 and slides at
>> http://www.slideshare.net/lallea/test-strategies-for-data-pr
>> ocessing-pipelines-67244458
>>
>> You can find more material on test strategies at
>> http://www.mapflat.com/lands/resources/reading-list/index.html
>>
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> https://twitter.com/lalleal
>> +46 70 7687109
>> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/lalle@mapf
>> lat.com
>>
>>
>> On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp 
>> wrote:
>> > somone can tell me how i can make unit test on pyspark ?
>> > (book, tutorial ...)
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>
>
>>>
>>
>


Re: How to convert Spark MLlib vector to ML Vector?

2017-04-10 Thread Md. Rezaul Karim
Hi Yan, Ryan, and Nick,

Actually, for a special use case, I had to use RDD-based Spark MLlib which
did not work eventually. Therefore, I had to switch to Spark ML later on.

Thanks for your support guys.




Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 10 April 2017 at 06:45, 颜发才(Yan Facai)  wrote:

> how about using
>
> val dataset = spark.read.format("libsvm")
>   .option("numFeatures", "780")
>   .load("data/mllib/sample_libsvm_data.txt")
>
> instead of
> val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
>
>
>
>
>
> On Mon, Apr 10, 2017 at 11:19 AM, Ryan  wrote:
>
>> you could write a udf using the asML method along with some type casting,
>> then apply the udf to data after pca.
>>
>> when using pipeline, that udf need to be wrapped in a customized
>> transformer, I think.
>>
>> On Sun, Apr 9, 2017 at 10:07 PM, Nick Pentreath > > wrote:
>>
>>> Why not use the RandomForest from Spark ML?
>>>
>>> On Sun, 9 Apr 2017 at 16:01, Md. Rezaul Karim <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
 I have already posted this question to the StackOverflow
 .
 However, not getting any response from someone else. I'm trying to use
 RandomForest algorithm for the classification after applying the PCA
 technique since the dataset is pretty high-dimensional. Here's my source
 code:

 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.tree.RandomForest
 import org.apache.spark.mllib.tree.model.RandomForestModel
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
 import org.apache.spark.sql._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.SparkSession

 import org.apache.spark.ml.feature.PCA
 import org.apache.spark.rdd.RDD

 object PCAExample {
   def main(args: Array[String]): Unit = {
 val spark = SparkSession
   .builder
   .master("local[*]")
   .config("spark.sql.warehouse.dir", "E:/Exp/")
   .appName(s"OneVsRestExample")
   .getOrCreate()

 val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, 
 "data/mnist.bz2")

 val splits = dataset.randomSplit(Array(0.7, 0.3), seed = 12345L)
 val (trainingData, testData) = (splits(0), splits(1))

 val sqlContext = new SQLContext(spark.sparkContext)
 import sqlContext.implicits._
 val trainingDF = trainingData.toDF("label", "features")

 val pca = new PCA()
   .setInputCol("features")
   .setOutputCol("pcaFeatures")
   .setK(100)
   .fit(trainingDF)

 val pcaTrainingData = pca.transform(trainingDF)
 //pcaTrainingData.show()

 val labeled = pca.transform(trainingDF).rdd.map(row => LabeledPoint(
   row.getAs[Double]("label"),
   row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")))

 //val labeled = pca.transform(trainingDF).rdd.map(row => 
 LabeledPoint(row.getAs[Double]("label"),
 //  
 Vector.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"

 val numClasses = 10
 val categoricalFeaturesInfo = Map[Int, Int]()
 val numTrees = 10 // Use more in practice.
 val featureSubsetStrategy = "auto" // Let the algorithm choose.
 val impurity = "gini"
 val maxDepth = 20
 val maxBins = 32

 val model = RandomForest.trainClassifier(labeled, numClasses, 
 categoricalFeaturesInfo,
   numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
   }
 }

 However, I'm getting the following error:

 *Exception in thread "main" java.lang.IllegalArgumentException:
 requirement failed: Column features must be of type
 org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
 org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.*

 What am I doing wrong in my code?  Actually, I'm getting the above
 exception in this line:

 val pca = new PCA()
   .setInputCol("features")
   .setOutputCol("pcaFeatures")
   .setK(100)
   .fit(trainingDF) /// GETTING EXCEPTION HERE

 Please, someone, help me to solve the problem.





 Kind regards,
 *Md. Rezaul Karim*

>>>
>>
>


Re: Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-10 Thread Ofir Manor
Also check SPARK-19478  -
JDBC sink (seems to be waiting for a review)

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io

On Mon, Apr 10, 2017 at 10:10 AM, Hemanth Gudela  wrote:

> Many thanks Silvio for the link. That’s exactly what I’m looking for. ☺
>
> However there is no mentioning of checkpoint support for custom
> “ForeachWriter” in structured streaming. I’m going to test that now.
>
>
>
> Good question Gary, this is the mentioning in the link
> 
> .
>
> Often times we want to be able to write output of streams to external
> databases such as MySQL. At the time of writing, the Structured Streaming
> API does not support external databases as sinks; however, when it does,
> the API option will be as simple as .format("jdbc").start("jdbc:mysql/..").
>
>
> In the meantime, we can use the foreach sink to accomplish this. Let’s
> create a custom JDBC Sink that extends *ForeachWriter* and implements its
> methods.
>
>
>
> I’m not sure though if jdbc sink feature will be available in upcoming
> spark (2.2.0?) version or not.
>
> It would good to know if someone has information about it.
>
>
>
> Thanks,
>
> Hemanth
>
>
>
> *From: *"lucas.g...@gmail.com" 
> *Date: *Monday, 10 April 2017 at 8.24
> *To: *"user@spark.apache.org" 
> *Subject: *Re: Does spark 2.1.0 structured streaming support jdbc sink?
>
>
>
> Interesting, does anyone know if we'll be seeing the JDBC sinks in
> upcoming releases?
>
>
>
> Thanks!
>
>
>
> Gary Lucas
>
>
>
> On 9 April 2017 at 13:52, Silvio Fiorito 
> wrote:
>
> JDBC sink is not in 2.1. You can see here for an example implementation
> using the ForEachWriter sink instead: https://databricks.com/blog/
> 2017/04/04/real-time-end-to-end-integration-with-apache-
> kafka-in-apache-sparks-structured-streaming.html
>
>
>
>
>
> *From: *Hemanth Gudela 
> *Date: *Sunday, April 9, 2017 at 4:30 PM
> *To: *"user@spark.apache.org" 
> *Subject: *Does spark 2.1.0 structured streaming support jdbc sink?
>
>
>
> Hello Everyone,
>
> I am new to Spark, especially spark streaming.
>
>
>
> I am trying to read an input stream from Kafka, perform windowed
> aggregations in spark using structured streaming, and finally write
> aggregates to a sink.
>
> -  MySQL as an output sink doesn’t seem to be an option, because
> this block of code throws an error
>
> streamingDF.writeStream.format("jdbc").start("jdbc:mysql…”)
>
> *ava.lang.UnsupportedOperationException*: Data source jdbc does not
> support streamed writing
>
> This is strange because, this
> 
> document shows that jdbc is supported as an output sink!
>
>
>
> -  Parquet doesn’t seem to be an option, because it doesn’t
> support “complete” output mode, but “append” only. As I’m preforming
> windows aggregations in spark streaming, the output mode has to be
> complete, and cannot be “append”
>
>
>
> -  Memory and console sinks are good for debugging, but are not
> suitable for production jobs.
>
>
>
> So, please correct me if I’m missing something in my code to enable jdbc
> output sink.
>
> If jdbc output sink is not option, please suggest me an alternative output
> sink that suits my needs better.
>
>
>
> Or since structured streaming is still ‘alpha’, should I resort to spark
> dstreams to achieve my use case described above.
>
> Please suggest.
>
>
>
> Thanks in advance,
>
> Hemanth
>
>
>


Two Nodes :SparkContext Null Pointer

2017-04-10 Thread Sriram
Hello Everyone,

Need support on this scenario,
We have two masters and three worker nodes all configured in standalone
cluster. There are two jobs deployed in all the worker nodes. one job Quartz
scheduler and the other job is some exporting application.

Quartz scheduler job is submitted from the shell and when the cron triggers
other job launched using java spark launcher from the first job. Both the
jobs runs fine on same worker node, but when master chooses different nodes
its unable to create a spark context in the second job. Any idea?.

Thanks,
Sriram.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Two-Nodes-SparkContext-Null-Pointer-tp28582.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-10 Thread Hemanth Gudela
Many thanks Silvio for the link. That’s exactly what I’m looking for. ☺
However there is no mentioning of checkpoint support for custom “ForeachWriter” 
in structured streaming. I’m going to test that now.

Good question Gary, this is the mentioning in the 
link.
Often times we want to be able to write output of streams to external databases 
such as MySQL. At the time of writing, the Structured Streaming API does not 
support external databases as sinks; however, when it does, the API option will 
be as simple as .format("jdbc").start("jdbc:mysql/..").
In the meantime, we can use the foreach sink to accomplish this. Let’s create a 
custom JDBC Sink that extends ForeachWriter and implements its methods.

I’m not sure though if jdbc sink feature will be available in upcoming spark 
(2.2.0?) version or not.
It would good to know if someone has information about it.

Thanks,
Hemanth

From: "lucas.g...@gmail.com" 
Date: Monday, 10 April 2017 at 8.24
To: "user@spark.apache.org" 
Subject: Re: Does spark 2.1.0 structured streaming support jdbc sink?

Interesting, does anyone know if we'll be seeing the JDBC sinks in upcoming 
releases?

Thanks!

Gary Lucas

On 9 April 2017 at 13:52, Silvio Fiorito 
> wrote:
JDBC sink is not in 2.1. You can see here for an example implementation using 
the ForEachWriter sink instead: 
https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


From: Hemanth Gudela 
>
Date: Sunday, April 9, 2017 at 4:30 PM
To: "user@spark.apache.org" 
>
Subject: Does spark 2.1.0 structured streaming support jdbc sink?

Hello Everyone,
I am new to Spark, especially spark streaming.

I am trying to read an input stream from Kafka, perform windowed aggregations 
in spark using structured streaming, and finally write aggregates to a sink.

-  MySQL as an output sink doesn’t seem to be an option, because this 
block of code throws an error

streamingDF.writeStream.format("jdbc").start("jdbc:mysql…”)

ava.lang.UnsupportedOperationException: Data source jdbc does not support 
streamed writing

This is strange because, 
this 
document shows that jdbc is supported as an output sink!



-  Parquet doesn’t seem to be an option, because it doesn’t support 
“complete” output mode, but “append” only. As I’m preforming windows 
aggregations in spark streaming, the output mode has to be complete, and cannot 
be “append”


-  Memory and console sinks are good for debugging, but are not 
suitable for production jobs.

So, please correct me if I’m missing something in my code to enable jdbc output 
sink.
If jdbc output sink is not option, please suggest me an alternative output sink 
that suits my needs better.

Or since structured streaming is still ‘alpha’, should I resort to spark 
dstreams to achieve my use case described above.
Please suggest.

Thanks in advance,
Hemanth