[ANNOUNCE] Apache Spark 2.1.1

2017-05-02 Thread Michael Armbrust
We are happy to announce the availability of Spark 2.1.1!

Apache Spark 2.1.1 is a maintenance release, based on the branch-2.1
maintenance branch of Spark. We strongly recommend all 2.1.x users to
upgrade to this stable release.

To download Apache Spark 2.1.1 visit http://spark.apache.org/downloads.html

We would like to acknowledge all community members for contributing patches
to this release.


Re: Joins in Spark

2017-05-02 Thread Angel Francisco Orta
Sorry, I had a typo I mean repartitionby("fieldofjoin)

El 2 may. 2017 9:44 p. m., "KhajaAsmath Mohammed" 
escribió:

Hi Angel,

I am trying using the below code but i dont see partition on the dataframe.

  val iftaGPSLocation_df = sqlContext.sql(iftaGPSLocQry)
  import sqlContext._
  import sqlContext.implicits._
  datapoint_prq_df.join(geoCacheLoc_df)

Val tableA = DfA.partitionby("joinField").filter("firstSegment")

Columns I have are Lat3,Lon3, VIN, Time  . Lat3 and Lon3 are my join
columns on both dataframes and rest are select columns

Thanks,
Asmath



On Tue, May 2, 2017 at 1:38 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Have you tried to make partition by join's field and run it by segments,
> filtering both tables at the same segments of data?
>
> Example:
>
> Val tableA = DfA.partitionby("joinField").filter("firstSegment")
> Val tableB= DfB.partitionby("joinField").filter("firstSegment")
>
> TableA.join(TableB)
>
> El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" 
> escribió:
>
>> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is
>> for one month i.e. for April
>>
>> Table 2: 92 GB not partitioned .
>>
>> I have to perform join on  these tables now.
>>
>>
>>
>> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
>> angel.francisco.o...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is the tables partitioned?
>>> If yes, what is the partition field?
>>>
>>> Thanks
>>>
>>>
>>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" <
>>> mdkhajaasm...@gmail.com> escribió:
>>>
>>> Hi,
>>>
>>> I am trying to join two big tables in spark and the job is running for
>>> quite a long time without any results.
>>>
>>> Table 1: 192GB
>>> Table 2: 92 GB
>>>
>>> Does anyone have better solution to get the results fast?
>>>
>>> Thanks,
>>> Asmath
>>>
>>>
>>>
>>


Re: Joins in Spark

2017-05-02 Thread KhajaAsmath Mohammed
Hi Angel,

I am trying using the below code but i dont see partition on the dataframe.

  val iftaGPSLocation_df = sqlContext.sql(iftaGPSLocQry)
  import sqlContext._
  import sqlContext.implicits._
  datapoint_prq_df.join(geoCacheLoc_df)

Val tableA = DfA.partitionby("joinField").filter("firstSegment")

Columns I have are Lat3,Lon3, VIN, Time  . Lat3 and Lon3 are my join
columns on both dataframes and rest are select columns

Thanks,
Asmath



On Tue, May 2, 2017 at 1:38 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Have you tried to make partition by join's field and run it by segments,
> filtering both tables at the same segments of data?
>
> Example:
>
> Val tableA = DfA.partitionby("joinField").filter("firstSegment")
> Val tableB= DfB.partitionby("joinField").filter("firstSegment")
>
> TableA.join(TableB)
>
> El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" 
> escribió:
>
>> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is
>> for one month i.e. for April
>>
>> Table 2: 92 GB not partitioned .
>>
>> I have to perform join on  these tables now.
>>
>>
>>
>> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
>> angel.francisco.o...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is the tables partitioned?
>>> If yes, what is the partition field?
>>>
>>> Thanks
>>>
>>>
>>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" <
>>> mdkhajaasm...@gmail.com> escribió:
>>>
>>> Hi,
>>>
>>> I am trying to join two big tables in spark and the job is running for
>>> quite a long time without any results.
>>>
>>> Table 1: 192GB
>>> Table 2: 92 GB
>>>
>>> Does anyone have better solution to get the results fast?
>>>
>>> Thanks,
>>> Asmath
>>>
>>>
>>>
>>


Re: [Spark Streaming] Dynamic Broadcast Variable Update

2017-05-02 Thread Tim Smith
One, I think, you should take this to the spark developer list.

Two, I suspect broadcast variables aren't the best solution for the use
case, you describe. Maybe an in-memory data/object/file store like tachyon
is a better fit.

Thanks,

Tim


On Tue, May 2, 2017 at 11:56 AM, Nipun Arora 
wrote:

> Hi All,
>
> To support our Spark Streaming based anomaly detection tool, we have made
> a patch in Spark 1.6.2 to dynamically update broadcast variables.
>
> I'll first explain our use-case, which I believe should be common to
> several people using Spark Streaming applications. Broadcast variables are
> often used to store values "machine learning models", which can then be
> used on streaming data to "test" and get the desired results (for our case
> anomalies). Unfortunately, in the current spark, broadcast variables are
> final and can only be initialized once before the initialization of the
> streaming context. Hence, if a new model is learned the streaming system
> cannot be updated without shutting down the application, broadcasting
> again, and restarting the application. Our goal was to re-broadcast
> variables without requiring a downtime of the streaming service.
>
> The key to this implementation is a live re-broadcastVariable() interface,
> which can be triggered in between micro-batch executions, without any
> re-boot required for the streaming application. At a high level the task is
> done by re-fetching broadcast variable information from the spark driver,
> and then re-distribute it to the workers. The micro-batch execution is
> blocked while the update is made, by taking a lock on the execution. We
> have already tested this in our prototype deployment of our anomaly
> detection service and can successfully re-broadcast the broadcast variables
> with no downtime.
>
> We would like to integrate these changes in spark, can anyone please let
> me know the process of submitting patches/ new features to spark. Also. I
> understand that the current version of Spark is 2.1. However, our changes
> have been done and tested on Spark 1.6.2, will this be a problem?
>
> Thanks
> Nipun
>



-- 

--
Thanks,

Tim


[Spark Streaming] Dynamic Broadcast Variable Update

2017-05-02 Thread Nipun Arora
Hi All,

To support our Spark Streaming based anomaly detection tool, we have made a
patch in Spark 1.6.2 to dynamically update broadcast variables.

I'll first explain our use-case, which I believe should be common to
several people using Spark Streaming applications. Broadcast variables are
often used to store values "machine learning models", which can then be
used on streaming data to "test" and get the desired results (for our case
anomalies). Unfortunately, in the current spark, broadcast variables are
final and can only be initialized once before the initialization of the
streaming context. Hence, if a new model is learned the streaming system
cannot be updated without shutting down the application, broadcasting
again, and restarting the application. Our goal was to re-broadcast
variables without requiring a downtime of the streaming service.

The key to this implementation is a live re-broadcastVariable() interface,
which can be triggered in between micro-batch executions, without any
re-boot required for the streaming application. At a high level the task is
done by re-fetching broadcast variable information from the spark driver,
and then re-distribute it to the workers. The micro-batch execution is
blocked while the update is made, by taking a lock on the execution. We
have already tested this in our prototype deployment of our anomaly
detection service and can successfully re-broadcast the broadcast variables
with no downtime.

We would like to integrate these changes in spark, can anyone please let me
know the process of submitting patches/ new features to spark. Also. I
understand that the current version of Spark is 2.1. However, our changes
have been done and tested on Spark 1.6.2, will this be a problem?

Thanks
Nipun


Re: do we need to enable writeAheadLogs for DirectStream as well or is it only for indirect stream?

2017-05-02 Thread Cody Koeninger
You don't need write ahead logs for direct stream.

On Tue, May 2, 2017 at 11:32 AM, kant kodali  wrote:
> Hi All,
>
> I need some fault tolerance for my stateful computations and I am wondering
> why we need to enable writeAheadLogs for DirectStream like Kafka (for
> Indirect stream it makes sense). In case of driver failure DirectStream such
> as Kafka can pull the messages again from the last committed offset right?
>
> Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Initialize Gaussian Mixture Model using Spark ML dataframe API

2017-05-02 Thread Tim Smith
Yes, I noticed these open issues, both with KMeans and GMM:
https://issues.apache.org/jira/browse/SPARK-13025

Thanks,

Tim


On Mon, May 1, 2017 at 9:01 PM, Yanbo Liang  wrote:

> Hi Tim,
>
> Spark ML API doesn't support set initial model for GMM currently. I wish
> we can get this feature in Spark 2.3.
>
> Thanks
> Yanbo
>
> On Fri, Apr 28, 2017 at 1:46 AM, Tim Smith  wrote:
>
>> Hi,
>>
>> I am trying to figure out the API to initialize a gaussian mixture model
>> using either centroids created by K-means or previously calculated GMM
>> model (I am aware that you can "save" a model and "load" in later but I am
>> not interested in saving a model to a filesystem).
>>
>> The Spark MLlib API lets you do this using SetInitialModel
>> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org
>> .apache.spark.mllib.clustering.GaussianMixture
>>
>> However, I cannot figure out how to do this using Spark ML API. Can
>> anyone please point me in the right direction? I've tried reading the Spark
>> ML code and was wondering if the "set" call lets you do that?
>>
>> --
>> Thanks,
>>
>> Tim
>>
>
>


-- 

--
Thanks,

Tim


Re: Driver spins hours in query plan optimization

2017-05-02 Thread Everett Anderson
Seems like

https://issues.apache.org/jira/browse/SPARK-13346

is likely the same issue.

Seems like for some people persist() doesn't work and they have to convert
to RDDs and back.


On Fri, Apr 14, 2017 at 1:39 PM, Everett Anderson  wrote:

> Hi,
>
> We keep hitting a situation on Spark 2.0.2 (haven't tested later versions,
> yet) where the driver spins forever seemingly in query plan optimization
> for moderate queries, such as the union of a few (~5) other DataFrames.
>
> We can see the driver spinning with one core in the nioEventLoopGroup-2-2
> thread in a deep trace like the attached.
>
> Throwing in a MEMORY_OR_DISK persist() so the query plan is collapsed
> works around this, but it's a little surprising how often we encounter the
> problem, forcing us to work to manage persisting/unpersisting tables and
> potentially suffering unnecessary disk I/O.
>
> I've looking through JIRA but don't see open issues about this -- might've
> just not found them successfully.
>
> Anyone else encounter this?
>
>


Re: Schema Evolution for nested Dataset[T]

2017-05-02 Thread Mike Wheeler
Hi Michael,

Thank you for the suggestions. I am wondering how I can make `withColumn`
to handle nested structure?

For example, below is my code to generate the data. I basically add the
`age` field to `Person2`, which is nested in an Array for Course2. Then I
want to fill in 0 for age with age is null.

case class Person1(name: String)
case class Person2(name: String, age: Int)
case class Course1(id: Int, students: Array[Person1])
case class Course2(id: Int, students: Array[Person2])
Seq(Course1(10, Array(Person1("a"),
Person1("b".toDF.write.parquet("data1")
Seq(Course2(20, Array(Person2("c",20),
Person2("d",10.toDF.write.parquet("data2")
val allData = spark.read.option("mergeSchema", "true").parquet("data1",
"data2")
allData.show

+---++
| id|students|
+---++
| 20|[[c,20], [d,10]]|
| 10|[[a,null], [b,null]]|
+---++



*My first try:*

allData.withColumn("students.age", coalesce($"students.age", lit(0)))

It returns the exception:

org.apache.spark.sql.AnalysisException: cannot resolve
'coalesce(`students`.`age`, 0)' due to data type mismatch: input to
function coalesce should all be the same type, but it's [array, int];;



*My second try: *

allData.withColumn("students.age", coalesce($"students.age", array(lit(0),
lit(0.show


+---+++
| id|students|students.age|
+---+++
| 20|[[c,20], [d,10]]|[20, 10]|
| 10|[[a,null], [b,null]]|[null, null]|
+---+++

It creates a new column "students.age" instead of imputing the value age
nested in students.

Thank you very much in advance.

Mike




On Mon, May 1, 2017 at 10:31 AM, Michael Armbrust 
wrote:

> Oh, and if you want a default other than null:
>
> import org.apache.spark.sql.functions._
> df.withColumn("address", coalesce($"address", lit())
>
> On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust 
> wrote:
>
>> The following should work:
>>
>> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
>> spark.read.schema(schema).parquet("data.parquet").as[Course]
>>
>> Note this will only work for nullable files (i.e. if you add a primitive
>> like Int you need to make it an Option[Int])
>>
>> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <
>> rotationsymmetr...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> Suppose I have some data (stored in parquet for example) generated as
>>> below:
>>>
>>> package com.company.entity.old
>>> case class Course(id: Int, students: List[Student])
>>> case class Student(name: String)
>>>
>>> Then usually I can access the data by
>>>
>>> spark.read.parquet("data.parquet").as[Course]
>>>
>>> Now I want to add a new field `address` to Student:
>>>
>>> package com.company.entity.new
>>> case class Course(id: Int, students: List[Student])
>>> case class Student(name: String, address: String)
>>>
>>> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
>>> on data generated by the old entity/schema will fail because `address`
>>> is missing.
>>>
>>> In this case, what is the best practice to read data generated with
>>> the old entity/schema to the new entity/schema, with the missing field
>>> set to some default value? I know I can manually write a function to
>>> do the transformation from the old to the new. But it is kind of
>>> tedious. Any automatic methods?
>>>
>>> Thanks,
>>>
>>> Mike
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Joins in Spark

2017-05-02 Thread Angel Francisco Orta
Have you tried to make partition by join's field and run it by segments,
filtering both tables at the same segments of data?

Example:

Val tableA = DfA.partitionby("joinField").filter("firstSegment")
Val tableB= DfB.partitionby("joinField").filter("firstSegment")

TableA.join(TableB)

El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" 
escribió:

> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is
> for one month i.e. for April
>
> Table 2: 92 GB not partitioned .
>
> I have to perform join on  these tables now.
>
>
>
> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
> angel.francisco.o...@gmail.com> wrote:
>
>> Hello,
>>
>> Is the tables partitioned?
>> If yes, what is the partition field?
>>
>> Thanks
>>
>>
>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" <
>> mdkhajaasm...@gmail.com> escribió:
>>
>> Hi,
>>
>> I am trying to join two big tables in spark and the job is running for
>> quite a long time without any results.
>>
>> Table 1: 192GB
>> Table 2: 92 GB
>>
>> Does anyone have better solution to get the results fast?
>>
>> Thanks,
>> Asmath
>>
>>
>>
>


Re: Joins in Spark

2017-05-02 Thread KhajaAsmath Mohammed
Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is for
one month i.e. for April

Table 2: 92 GB not partitioned .

I have to perform join on  these tables now.



On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Hello,
>
> Is the tables partitioned?
> If yes, what is the partition field?
>
> Thanks
>
>
> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" 
> escribió:
>
> Hi,
>
> I am trying to join two big tables in spark and the job is running for
> quite a long time without any results.
>
> Table 1: 192GB
> Table 2: 92 GB
>
> Does anyone have better solution to get the results fast?
>
> Thanks,
> Asmath
>
>
>


Re: Joins in Spark

2017-05-02 Thread Angel Francisco Orta
Hello,

Is the tables partitioned?
If yes, what is the partition field?

Thanks


El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" 
escribió:

Hi,

I am trying to join two big tables in spark and the job is running for
quite a long time without any results.

Table 1: 192GB
Table 2: 92 GB

Does anyone have better solution to get the results fast?

Thanks,
Asmath


Joins in Spark

2017-05-02 Thread KhajaAsmath Mohammed
Hi,

I am trying to join two big tables in spark and the job is running for
quite a long time without any results.

Table 1: 192GB
Table 2: 92 GB

Does anyone have better solution to get the results fast?

Thanks,
Asmath


do we need to enable writeAheadLogs for DirectStream as well or is it only for indirect stream?

2017-05-02 Thread kant kodali
Hi All,

I need some fault tolerance for my stateful computations and I am wondering
why we need to enable writeAheadLogs for DirectStream like Kafka (for
Indirect stream it makes sense). In case of driver failure DirectStream
such as Kafka can pull the messages again from the last committed offset
right?

Thanks!


Re: --jars does not take remote jar?

2017-05-02 Thread Nan Zhu
I see.Thanks!

On Tue, May 2, 2017 at 9:12 AM, Marcelo Vanzin  wrote:

> On Tue, May 2, 2017 at 9:07 AM, Nan Zhu  wrote:
> > I have no easy way to pass jar path to those forked Spark
> > applications? (except that I download jar from a remote path to a local
> temp
> > dir after resolving some permission issues, etc.?)
>
> Yes, that's the only way currently in client mode.
>
> --
> Marcelo
>


Re: --jars does not take remote jar?

2017-05-02 Thread Marcelo Vanzin
On Tue, May 2, 2017 at 9:07 AM, Nan Zhu  wrote:
> I have no easy way to pass jar path to those forked Spark
> applications? (except that I download jar from a remote path to a local temp
> dir after resolving some permission issues, etc.?)

Yes, that's the only way currently in client mode.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: --jars does not take remote jar?

2017-05-02 Thread Nan Zhu
Thanks for the reply! If I have an application master which starts some
Spark applications by forking processes (in yarn-client mode)

Essentially I have no easy way to pass jar path to those forked Spark
applications? (except that I download jar from a remote path to a local
temp dir after resolving some permission issues, etc.?)

On Tue, May 2, 2017 at 9:00 AM, Marcelo Vanzin  wrote:

> Remote jars are added to executors' classpaths, but not the driver's.
> In YARN cluster mode, they would also be added to the driver's class
> path.
>
> On Tue, May 2, 2017 at 8:43 AM, Nan Zhu  wrote:
> > Hi, all
> >
> > For some reason, I tried to pass in a HDFS path to the --jars option in
> > spark-submit
> >
> > According to the document,
> > http://spark.apache.org/docs/latest/submitting-
> applications.html#advanced-dependency-management,
> > --jars would accept remote path
> >
> > However, in the implementation,
> > https://github.com/apache/spark/blob/c622a87c44e0621e1b3024fdca9b2a
> a3c508615b/core/src/main/scala/org/apache/spark/deploy/
> SparkSubmit.scala#L757,
> > it does not look like so
> >
> > Did I miss anything?
> >
> > Best,
> >
> > Nan
>
>
>
> --
> Marcelo
>


Re: --jars does not take remote jar?

2017-05-02 Thread Marcelo Vanzin
Remote jars are added to executors' classpaths, but not the driver's.
In YARN cluster mode, they would also be added to the driver's class
path.

On Tue, May 2, 2017 at 8:43 AM, Nan Zhu  wrote:
> Hi, all
>
> For some reason, I tried to pass in a HDFS path to the --jars option in
> spark-submit
>
> According to the document,
> http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management,
> --jars would accept remote path
>
> However, in the implementation,
> https://github.com/apache/spark/blob/c622a87c44e0621e1b3024fdca9b2aa3c508615b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L757,
> it does not look like so
>
> Did I miss anything?
>
> Best,
>
> Nan



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



--jars does not take remote jar?

2017-05-02 Thread Nan Zhu
Hi, all

For some reason, I tried to pass in a HDFS path to the --jars option in
spark-submit

According to the document,
http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management,
--jars would accept remote path

However, in the implementation,
https://github.com/apache/spark/blob/c622a87c44e0621e1b3024fdca9b2aa3c508615b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L757,
it does not look like so

Did I miss anything?

Best,

Nan


OutOfMemoryError

2017-05-02 Thread TwUxTLi51Nus

Hi Spark Users,

I have a dataset with ~5M rows x 20 columns, containing a groupID and a 
rowID. My goal is to check whether (some) columns contain more than a 
fixed fraction (say, 50%) of missing (null) values within a group. If 
this is found, the entire column is set to missing (null), for that 
group.


The Problem:
The loop runs like a charm during the first iterations, but towards the 
end, around the 6th or 7th iteration I see my CPU utilization dropping 
(using 1 instead of 6 cores). Along with that, execution time for one 
iteration increases significantly. At some point, I get an OutOfMemory 
Error:


* spark.driver.memory < 4G: at collect() (FAIL 1)
* 4G <= spark.driver.memory < 10G: at the count() step (FAIL 2)

Enabling a HeapDump on OOM (and analyzing it with Eclipse MAT) showed 
two classes taking up lots of memory:


* java.lang.Thread
  - char (2G)
  - scala.collection.IndexedSeqLike
  - scala.collection.mutable.WrappedArray (1G)
  - java.lang.String (1G)

* org.apache.spark.sql.execution.ui.SQLListener
  - org.apache.spark.sql.execution.ui.SQLExecutionUIData
(various of up to 1G in size)
  - java.lang.String
  - ...

Turning off the SparkUI and/or setting spark.ui.retainedXXX to something 
low (e.g. 1) did not solve the issue.


Any idea what I am doing wrong? Or is this a bug?

My Code can be found as a Github Gist [0]. More details can be found on 
the StackOverflow Question [1] I posted, but did not receive any answers 
until now.


Thanks!

[0] 
https://gist.github.com/TwUxTLi51Nus/4accdb291494be9201abfad72541ce74
[1] 
http://stackoverflow.com/questions/43637913/apache-spark-outofmemoryerror-heapspace


PS: As a workaround, I have been writing and reading temporary parquet 
files on each loop iteration.



--
Tw UxTLi51Nus
Email: twuxtli51...@posteo.de

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org