Is there a list of missing optimizations for typed functions?

2017-02-22 Thread Justin Pihony
I was curious if there was introspection of certain typed functions and ran
the following two queries:

ds.where($"col" > 1).explain
ds.filter(_.col > 1).explain

And found that the typed function does NOT result in a PushedFilter. I
imagine this is due to a limited view of the function, so I have two
questions really:

1.) Is there a list of the methods that lose some of the optimizations that
you get from non-functional methods? Is it any method that accepts a generic
function?
2.) Is there any work to attempt reflection and gain some of these
optimizations back? I couldn't find anything in JIRA.

Thanks,
Justin Pihony



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed-functions-tp28418.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-22 Thread Nick Pentreath
And to be clear, are you doing a self-join for approx similarity? Or
joining to another dataset?



On Thu, 23 Feb 2017 at 02:01, nguyen duc Tuan  wrote:

> Hi Seth,
> Here's the parameters that I used in my experiments.
> - Number of executors: 16
> - Executor's memories: vary from 1G -> 2G -> 3G
> - Number of cores per executor: 1-> 2
> - Driver's memory:  1G -> 2G -> 3G
> - The similar threshold: 0.6
> MinHash:
> - number of hash tables: 2
> SignedRandomProjection:
> - Number of hash tables: 2
>
> 2017-02-23 0:13 GMT+07:00 Seth Hendrickson :
>
> I'm looking into this a bit further, thanks for bringing it up! Right now
> the LSH implementation only uses OR-amplification. The practical
> consequence of this is that it will select too many candidates when doing
> approximate near neighbor search and approximate similarity join. When we
> add AND-amplification I think it will become significantly more usable. In
> the meantime, I will also investigate scalability issues.
>
> Can you please provide every parameter you used? It will be very helfpul
> :) For instance, the similarity threshold, the number of hash tables, the
> bucket width, etc...
>
> Thanks!
>
> On Mon, Feb 13, 2017 at 3:21 PM, Nick Pentreath 
> wrote:
>
> The original Uber authors provided this performance test result:
> https://docs.google.com/document/d/19BXg-67U83NVB3M0I84HVBVg3baAVaESD_mrg_-vLro
>
> This was for MinHash only though, so it's not clear about what the
> scalability is for the other metric types.
>
> The SignRandomProjectionLSH is not yet in Spark master (see
> https://issues.apache.org/jira/browse/SPARK-18082). It could be there are
> some implementation details that would make a difference here.
>
> By the way, what is the join threshold you use in approx join?
>
> Could you perhaps create a JIRA ticket with the details in order to track
> this?
>
>
> On Sun, 12 Feb 2017 at 22:52 nguyen duc Tuan  wrote:
>
> After all, I switched back to LSH implementation that I used before (
> https://github.com/karlhigley/spark-neighbors ). I can run on my dataset
> now. If someone has any suggestion, please tell me.
> Thanks.
>
> 2017-02-12 9:25 GMT+07:00 nguyen duc Tuan :
>
> Hi Timur,
> 1) Our data is transformed to dataset of Vector already.
> 2) If I use RandomSignProjectLSH, the job dies after I call
> approximateSimilarJoin. I tried to use Minhash instead, the job is still
> slow. I don't thinks the problem is related to the GC. The time for GC is
> small compare with the time for computation. Here is some screenshots of my
> job.
> Thanks
>
> 2017-02-12 8:01 GMT+07:00 Timur Shenkao :
>
> Hello,
>
> 1) Are you sure that your data is "clean"?  No unexpected missing values?
> No strings in unusual encoding? No additional or missing columns ?
> 2) How long does your job run? What about garbage collector parameters?
> Have you checked what happens with jconsole / jvisualvm ?
>
> Sincerely yours, Timur
>
> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
> wrote:
>
> Hi Nick,
> Because we use *RandomSignProjectionLSH*, there is only one parameter for
> LSH is the number of hashes. I try with small number of hashes (2) but the
> error is still happens. And it happens when I call similarity join. After
> transformation, the size of  dataset is about 4G.
>
> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>
> What other params are you using for the lsh transformer?
>
> Are the issues occurring during transform or during the similarity join?
>
>
> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
> wrote:
>
> hi Das,
> In general, I will apply them to larger datasets, so I want to use LSH,
> which is more scaleable than the approaches as you suggested. Have you
> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
> parameters/configuration to make it work ?
> Thanks.
>
> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>
> If it is 7m rows and 700k features (or say 1m features) brute force row
> similarity will run fine as well...check out spark-4823...you can compare
> quality with approximate variant...
> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>
> Hi everyone,
> Since spark 2.1.0 introduces LSH (
> http://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing),
> we want to use LSH to find approximately nearest neighbors. Basically, We
> have dataset with about 7M rows. we want to use cosine distance to meassure
> the similarity between items, so we use *RandomSignProjectionLSH* (
> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db) instead
> of *BucketedRandomProjectionLSH*. I try to tune some configurations such
> as serialization, memory fraction, executor memory (~6G), number of
> executors ( ~20), memory overhead ..., but nothing works. I 

Is there any limit on number of tasks per stage attempt?

2017-02-22 Thread Parag Chaudhari
Hi,

Is there any limit on number of tasks per stage attempt?


*Thanks,*

*​Parag​*


Re: Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Parag Chaudhari
Thanks!

If spark does not log these events in event log then why spark history
server provides an API to get RDD information?

>From the documentation,

/applications/[app-id]/storage/rdd   A list of stored RDDs for the given
application.

/applications/[app-id]/storage/rdd/[rdd-id]   Details for the storage
status of a given RDD.




*Thanks,Parag Chaudhari,**USC Alumnus (Fight On!)*
*Mobile : (213)-572-7858*
*Profile: http://www.linkedin.com/pub/parag-chaudhari/28/a55/254
*


On Wed, Feb 22, 2017 at 7:44 PM, Saisai Shao  wrote:

> It is too verbose, and will significantly increase the size event log.
>
> Here is the comment in the code:
>
> // No-op because logging every update would be overkill
>> override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
>>
>>
> On Thu, Feb 23, 2017 at 11:42 AM, Parag Chaudhari 
> wrote:
>
>> Thanks a lot the information!
>>
>> Is there any reason why EventLoggingListener ignore this event?
>>
>> *Thanks,*
>>
>>
>> *​Parag​*
>>
>> On Wed, Feb 22, 2017 at 7:11 PM, Saisai Shao 
>> wrote:
>>
>>> AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it
>>> will not be written into event-log, I think that's why you cannot get such
>>> info in history server.
>>>
>>> On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari 
>>> wrote:
>>>
 Hi,

 I am running spark shell in spark version 2.0.2. Here is my program,

 var myrdd = sc.parallelize(Array.range(1, 10))
 myrdd.setName("test")
 myrdd.cache
 myrdd.collect

 But I am not able to see any RDD info in "storage" tab in spark history
 server.

 I looked at this
 
 but it is not helping as I have exact similar program mentioned there. Can
 anyone help?


 *Thanks,*

 *​Parag​*

>>>
>>>
>>
>


DataframeWriter - How to change filename extension

2017-02-22 Thread Nirav Patel
Hi,

I am writing Dataframe as TSV using DataframeWriter as follows:

myDF.write.mode("overwrite").option("sep","\t").csv("/out/path")

Problem is all part files have .csv extension instead of .tsv as follows:
part-r-00012-f9f06712-1648-4eb6-985b-8a9c79267eef.csv

All the records are stored in TSV format though.

How can I change extension as well to .tsv instead?

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Saisai Shao
It is too verbose, and will significantly increase the size event log.

Here is the comment in the code:

// No-op because logging every update would be overkill
> override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
>
>
On Thu, Feb 23, 2017 at 11:42 AM, Parag Chaudhari 
wrote:

> Thanks a lot the information!
>
> Is there any reason why EventLoggingListener ignore this event?
>
> *Thanks,*
>
>
> *​Parag​*
>
> On Wed, Feb 22, 2017 at 7:11 PM, Saisai Shao 
> wrote:
>
>> AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it
>> will not be written into event-log, I think that's why you cannot get such
>> info in history server.
>>
>> On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari 
>> wrote:
>>
>>> Hi,
>>>
>>> I am running spark shell in spark version 2.0.2. Here is my program,
>>>
>>> var myrdd = sc.parallelize(Array.range(1, 10))
>>> myrdd.setName("test")
>>> myrdd.cache
>>> myrdd.collect
>>>
>>> But I am not able to see any RDD info in "storage" tab in spark history
>>> server.
>>>
>>> I looked at this
>>> 
>>> but it is not helping as I have exact similar program mentioned there. Can
>>> anyone help?
>>>
>>>
>>> *Thanks,*
>>>
>>> *​Parag​*
>>>
>>
>>
>


Re: Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Parag Chaudhari
Thanks a lot the information!

Is there any reason why EventLoggingListener ignore this event?

*Thanks,*


*​Parag​*

On Wed, Feb 22, 2017 at 7:11 PM, Saisai Shao  wrote:

> AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it will
> not be written into event-log, I think that's why you cannot get such info
> in history server.
>
> On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari 
> wrote:
>
>> Hi,
>>
>> I am running spark shell in spark version 2.0.2. Here is my program,
>>
>> var myrdd = sc.parallelize(Array.range(1, 10))
>> myrdd.setName("test")
>> myrdd.cache
>> myrdd.collect
>>
>> But I am not able to see any RDD info in "storage" tab in spark history
>> server.
>>
>> I looked at this
>> 
>> but it is not helping as I have exact similar program mentioned there. Can
>> anyone help?
>>
>>
>> *Thanks,*
>>
>> *​Parag​*
>>
>
>


Re: Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Saisai Shao
AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it will
not be written into event-log, I think that's why you cannot get such info
in history server.

On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari 
wrote:

> Hi,
>
> I am running spark shell in spark version 2.0.2. Here is my program,
>
> var myrdd = sc.parallelize(Array.range(1, 10))
> myrdd.setName("test")
> myrdd.cache
> myrdd.collect
>
> But I am not able to see any RDD info in "storage" tab in spark history
> server.
>
> I looked at this
> 
> but it is not helping as I have exact similar program mentioned there. Can
> anyone help?
>
>
> *Thanks,*
>
> *​Parag​*
>


Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Parag Chaudhari
Hi,

I am running spark shell in spark version 2.0.2. Here is my program,

var myrdd = sc.parallelize(Array.range(1, 10))
myrdd.setName("test")
myrdd.cache
myrdd.collect

But I am not able to see any RDD info in "storage" tab in spark history
server.

I looked at this

but it is not helping as I have exact similar program mentioned there. Can
anyone help?


*Thanks,*

*​Parag​*


Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

2017-02-22 Thread Cody Koeninger
If you're talking about the version of scala used to build the broker,
that shouldn't matter.
If you're talking about the version of scala used for the kafka client
dependency, it shouldn't have compiled at all to begin with.

On Wed, Feb 22, 2017 at 12:11 PM, Muhammad Haseeb Javed
<11besemja...@seecs.edu.pk> wrote:
> I just noticed that Spark version that I am using (2.0.2) is built with
> Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could this
> be the reason why we are getting this error?
>
> On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger  wrote:
>>
>> So there's no reason to use checkpointing at all, right?  Eliminate
>> that as a possible source of problems.
>>
>> Probably unrelated, but this also isn't a very good way to benchmark.
>> Kafka producers are threadsafe, there's no reason to create one for
>> each partition.
>>
>> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
>> <11besemja...@seecs.edu.pk> wrote:
>> > This is the code that I have been trying is giving me this error. No
>> > complicated operation being performed on the topics as far as I can see.
>> >
>> > class Identity() extends BenchBase {
>> >
>> >
>> >   override def process(lines: DStream[(Long, String)], config:
>> > SparkBenchConfig): Unit = {
>> >
>> > val reportTopic = config.reporterTopic
>> >
>> > val brokerList = config.brokerList
>> >
>> >
>> > lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
>> >
>> >   val reporter = new KafkaReporter(reportTopic, brokerList)
>> >
>> >   partLines.foreach{ case (inTime , content) =>
>> >
>> > val outTime = System.currentTimeMillis()
>> >
>> > reporter.report(inTime, outTime)
>> >
>> > if(config.debugMode) {
>> >
>> >   println("Event: " + inTime + ", " + outTime)
>> >
>> > }
>> >
>> >   }
>> >
>> > }))
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> That's an indication that the beginning offset for a given batch is
>> >> higher than the ending offset, i.e. something is seriously wrong.
>> >>
>> >> Are you doing anything at all odd with topics, i.e. deleting and
>> >> recreating them, using compacted topics, etc?
>> >>
>> >> Start off with a very basic stream over the same kafka topic that just
>> >> does foreach println or similar, with no checkpointing at all, and get
>> >> that working first.
>> >>
>> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
>> >> <11besemja...@seecs.edu.pk> wrote:
>> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>> >> >
>> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
>> >> > <11besemja...@seecs.edu.pk> wrote:
>> >> >>
>> >> >> I am PhD student at Ohio State working on a study to evaluate
>> >> >> streaming
>> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
>> >> >> HiBench
>> >> >> benchmarks. But I think I am having a problem  with Spark. I have
>> >> >> Spark
>> >> >> Streaming application which I am trying to run on a 5 node cluster
>> >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> >> >> whenever I run a Spark Streaming application I encounter the
>> >> >> following
>> >> >> error:
>> >> >>
>> >> >> java.lang.IllegalArgumentException: requirement failed: numRecords
>> >> >> must
>> >> >> not be negative
>> >> >> at scala.Predef$.require(Predef.scala:224)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >> >> at
>> >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> >> >> at scala.Option.orElse(Option.scala:289)
>> >> >>
>> >> >> The application 

RDD blocks on Spark Driver

2017-02-22 Thread prithish
 
 
Hello,   
 
 
 

 
Had a question. When I look at the executors tab in Spark UI, I notice that 
some RDD blocks are assigned to the driver as well. Can someone please tell me 
why?
 

 
Thanks for the help.
 
 
 

 
 

Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-22 Thread nguyen duc Tuan
Hi Seth,
Here's the parameters that I used in my experiments.
- Number of executors: 16
- Executor's memories: vary from 1G -> 2G -> 3G
- Number of cores per executor: 1-> 2
- Driver's memory:  1G -> 2G -> 3G
- The similar threshold: 0.6
MinHash:
- number of hash tables: 2
SignedRandomProjection:
- Number of hash tables: 2

2017-02-23 0:13 GMT+07:00 Seth Hendrickson :

> I'm looking into this a bit further, thanks for bringing it up! Right now
> the LSH implementation only uses OR-amplification. The practical
> consequence of this is that it will select too many candidates when doing
> approximate near neighbor search and approximate similarity join. When we
> add AND-amplification I think it will become significantly more usable. In
> the meantime, I will also investigate scalability issues.
>
> Can you please provide every parameter you used? It will be very helfpul
> :) For instance, the similarity threshold, the number of hash tables, the
> bucket width, etc...
>
> Thanks!
>
> On Mon, Feb 13, 2017 at 3:21 PM, Nick Pentreath 
> wrote:
>
>> The original Uber authors provided this performance test result:
>> https://docs.google.com/document/d/19BXg-67U83NVB3M0
>> I84HVBVg3baAVaESD_mrg_-vLro
>>
>> This was for MinHash only though, so it's not clear about what the
>> scalability is for the other metric types.
>>
>> The SignRandomProjectionLSH is not yet in Spark master (see
>> https://issues.apache.org/jira/browse/SPARK-18082). It could be there
>> are some implementation details that would make a difference here.
>>
>> By the way, what is the join threshold you use in approx join?
>>
>> Could you perhaps create a JIRA ticket with the details in order to track
>> this?
>>
>>
>> On Sun, 12 Feb 2017 at 22:52 nguyen duc Tuan 
>> wrote:
>>
>>> After all, I switched back to LSH implementation that I used before (
>>> https://github.com/karlhigley/spark-neighbors ). I can run on my
>>> dataset now. If someone has any suggestion, please tell me.
>>> Thanks.
>>>
>>> 2017-02-12 9:25 GMT+07:00 nguyen duc Tuan :
>>>
>>> Hi Timur,
>>> 1) Our data is transformed to dataset of Vector already.
>>> 2) If I use RandomSignProjectLSH, the job dies after I call
>>> approximateSimilarJoin. I tried to use Minhash instead, the job is still
>>> slow. I don't thinks the problem is related to the GC. The time for GC is
>>> small compare with the time for computation. Here is some screenshots of my
>>> job.
>>> Thanks
>>>
>>> 2017-02-12 8:01 GMT+07:00 Timur Shenkao :
>>>
>>> Hello,
>>>
>>> 1) Are you sure that your data is "clean"?  No unexpected missing
>>> values? No strings in unusual encoding? No additional or missing columns ?
>>> 2) How long does your job run? What about garbage collector parameters?
>>> Have you checked what happens with jconsole / jvisualvm ?
>>>
>>> Sincerely yours, Timur
>>>
>>> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
>>> wrote:
>>>
>>> Hi Nick,
>>> Because we use *RandomSignProjectionLSH*, there is only one parameter
>>> for LSH is the number of hashes. I try with small number of hashes (2) but
>>> the error is still happens. And it happens when I call similarity join.
>>> After transformation, the size of  dataset is about 4G.
>>>
>>> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>>>
>>> What other params are you using for the lsh transformer?
>>>
>>> Are the issues occurring during transform or during the similarity join?
>>>
>>>
>>> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
>>> wrote:
>>>
>>> hi Das,
>>> In general, I will apply them to larger datasets, so I want to use LSH,
>>> which is more scaleable than the approaches as you suggested. Have you
>>> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
>>> parameters/configuration to make it work ?
>>> Thanks.
>>>
>>> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>>>
>>> If it is 7m rows and 700k features (or say 1m features) brute force row
>>> similarity will run fine as well...check out spark-4823...you can compare
>>> quality with approximate variant...
>>> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>>>
>>> Hi everyone,
>>> Since spark 2.1.0 introduces LSH (http://spark.apache.org/docs/
>>> latest/ml-features.html#locality-sensitive-hashing), we want to use LSH
>>> to find approximately nearest neighbors. Basically, We have dataset with
>>> about 7M rows. we want to use cosine distance to meassure the similarity
>>> between items, so we use *RandomSignProjectionLSH* (
>>> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db)
>>> instead of *BucketedRandomProjectionLSH*. I try to tune some
>>> configurations such as serialization, memory fraction, executor memory
>>> (~6G), number of executors ( ~20), memory overhead ..., but nothing works.
>>> I often get error 

Spark Streaming - parallel recovery

2017-02-22 Thread Dominik Safaric
Hi,

As I am investigate among others onto the fault recovery capabilities of Spark, 
I’ve been curious - what source code artifact initiates the parallel recovery 
process? In addition, how is a faulty node detected (from a driver's point of 
view)?

Thanks in advance,
Dominik 
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

2017-02-22 Thread Muhammad Haseeb Javed
I just noticed that Spark version that I am using (2.0.2) is built with
Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could
this be the reason why we are getting this error?

On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger  wrote:

> So there's no reason to use checkpointing at all, right?  Eliminate
> that as a possible source of problems.
>
> Probably unrelated, but this also isn't a very good way to benchmark.
> Kafka producers are threadsafe, there's no reason to create one for
> each partition.
>
> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
> <11besemja...@seecs.edu.pk> wrote:
> > This is the code that I have been trying is giving me this error. No
> > complicated operation being performed on the topics as far as I can see.
> >
> > class Identity() extends BenchBase {
> >
> >
> >   override def process(lines: DStream[(Long, String)], config:
> > SparkBenchConfig): Unit = {
> >
> > val reportTopic = config.reporterTopic
> >
> > val brokerList = config.brokerList
> >
> >
> > lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
> >
> >   val reporter = new KafkaReporter(reportTopic, brokerList)
> >
> >   partLines.foreach{ case (inTime , content) =>
> >
> > val outTime = System.currentTimeMillis()
> >
> > reporter.report(inTime, outTime)
> >
> > if(config.debugMode) {
> >
> >   println("Event: " + inTime + ", " + outTime)
> >
> > }
> >
> >   }
> >
> > }))
> >
> >   }
> >
> > }
> >
> >
> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger 
> wrote:
> >>
> >> That's an indication that the beginning offset for a given batch is
> >> higher than the ending offset, i.e. something is seriously wrong.
> >>
> >> Are you doing anything at all odd with topics, i.e. deleting and
> >> recreating them, using compacted topics, etc?
> >>
> >> Start off with a very basic stream over the same kafka topic that just
> >> does foreach println or similar, with no checkpointing at all, and get
> >> that working first.
> >>
> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
> >> <11besemja...@seecs.edu.pk> wrote:
> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
> >> >
> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> >> > <11besemja...@seecs.edu.pk> wrote:
> >> >>
> >> >> I am PhD student at Ohio State working on a study to evaluate
> streaming
> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
> HiBench
> >> >> benchmarks. But I think I am having a problem  with Spark. I have
> Spark
> >> >> Streaming application which I am trying to run on a 5 node cluster
> >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
> >> >> whenever I run a Spark Streaming application I encounter the
> following
> >> >> error:
> >> >>
> >> >> java.lang.IllegalArgumentException: requirement failed: numRecords
> must
> >> >> not be negative
> >> >> at scala.Predef$.require(Predef.scala:224)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<
> init>(InputInfoTracker.scala:38)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:165)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >> at
> >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> >> at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >> >> at scala.Option.orElse(Option.scala:289)
> >> >>
> >> >> The application starts fine, but as soon as the Kafka producers start
> >> >> emitting the stream data I start receiving the aforementioned error
> >> >> repeatedly.
> >> >>
> >> >> I have tried removing Spark Streaming checkpointing files as has been
> >> >> suggested in similar posts on the internet. However, the problem
> >> >> persists
> >> >> even if I start a Kafka topic and its corresponding consumer Spark
> >> >> Streaming
> >> >> application for the first time. Also the problem could not be offset
> >> >> related
> >> >> as I start 

Re: Spark Streaming: Using external data during stream transformation

2017-02-22 Thread Abhisheks
If I understand correctly, you need to create a UDF (if you are using java
Extend appropriate UDF e.g. UDF1, UDF2 ..etc depending on number of
arguments and have this static list as a member variable in your class.

You can use this udf as filter in your stream directly.

On Tue, Feb 21, 2017 at 8:59 PM, nitishdeshpande [via Apache Spark User
List]  wrote:

> I have a situation where I have to filter data-points in a stream based on
> some condition involving a reference to external data. I have loaded up the
> external data in a Dataframe (so that I get to query on it using SQL
> interface). But when I tried to query on Dataframe I see that we cannot
> access it inside the transform (filter) function. (sample code below)
>
>  // DStream is created and temp table called 'locations' is registered
> dStream.filter(dp => {
>  val responseDf = sqlContext.sql("select location from
> locations where id='001'")
>  responseDf.show()  //nothing is displayed
>  // some condition evaluation using responseDf
>  true
> })
>
> Am I doing something wrong? If yes, then what would be a better approach
> to load external data in-memory and query it during stream transformation
> stage.
>
> Link to S.O question http://stackoverflow.com/questions/42362012/spark-
> streaming-using-external-data-during-stream-transformation
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Streaming-Using-external-data-during-stream-transformation-tp28412.html
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Using-external-data-during-stream-transformation-tp28412p28415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Executor links in Job History

2017-02-22 Thread yohann jardin
Hello,


I'm using Spark 2.1.0 and hadoop 2.2.0.

When I launch jobs on Yarn, I can retrieve their information on Spark History 
Server, except that the links to stdout/stderr of executors are wrong -> they 
lead to their url while the job was running.


We have the flag 'yarn.log-aggregation-enable' set to true and once a job is 
finished on Yarn, its logs are sent to HDFS.


On the client end, when I launch my job i set 'spark.eventLog.enabled' to true, 
and specify 'spark.eventLog.dir'. I can retrieve the DAG and such afterward on 
Spark History Server.


I checked http://spark.apache.org/docs/latest/running-on-yarn.html and 
http://spark.apache.org/docs/latest/monitoring.html

But I do not find what i'm missing to let Spark History Server redirect me to 
Yarn History Server with a valid link, to see the stdout/stderr logs of the 
executors.



Any idea?


Regards,

Yohann


Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-22 Thread Seth Hendrickson
I'm looking into this a bit further, thanks for bringing it up! Right now
the LSH implementation only uses OR-amplification. The practical
consequence of this is that it will select too many candidates when doing
approximate near neighbor search and approximate similarity join. When we
add AND-amplification I think it will become significantly more usable. In
the meantime, I will also investigate scalability issues.

Can you please provide every parameter you used? It will be very helfpul :)
For instance, the similarity threshold, the number of hash tables, the
bucket width, etc...

Thanks!

On Mon, Feb 13, 2017 at 3:21 PM, Nick Pentreath 
wrote:

> The original Uber authors provided this performance test result:
> https://docs.google.com/document/d/19BXg-67U83NVB3M0I84HVBVg3baAVaESD_
> mrg_-vLro
>
> This was for MinHash only though, so it's not clear about what the
> scalability is for the other metric types.
>
> The SignRandomProjectionLSH is not yet in Spark master (see
> https://issues.apache.org/jira/browse/SPARK-18082). It could be there are
> some implementation details that would make a difference here.
>
> By the way, what is the join threshold you use in approx join?
>
> Could you perhaps create a JIRA ticket with the details in order to track
> this?
>
>
> On Sun, 12 Feb 2017 at 22:52 nguyen duc Tuan  wrote:
>
>> After all, I switched back to LSH implementation that I used before (
>> https://github.com/karlhigley/spark-neighbors ). I can run on my dataset
>> now. If someone has any suggestion, please tell me.
>> Thanks.
>>
>> 2017-02-12 9:25 GMT+07:00 nguyen duc Tuan :
>>
>> Hi Timur,
>> 1) Our data is transformed to dataset of Vector already.
>> 2) If I use RandomSignProjectLSH, the job dies after I call
>> approximateSimilarJoin. I tried to use Minhash instead, the job is still
>> slow. I don't thinks the problem is related to the GC. The time for GC is
>> small compare with the time for computation. Here is some screenshots of my
>> job.
>> Thanks
>>
>> 2017-02-12 8:01 GMT+07:00 Timur Shenkao :
>>
>> Hello,
>>
>> 1) Are you sure that your data is "clean"?  No unexpected missing values?
>> No strings in unusual encoding? No additional or missing columns ?
>> 2) How long does your job run? What about garbage collector parameters?
>> Have you checked what happens with jconsole / jvisualvm ?
>>
>> Sincerely yours, Timur
>>
>> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
>> wrote:
>>
>> Hi Nick,
>> Because we use *RandomSignProjectionLSH*, there is only one parameter
>> for LSH is the number of hashes. I try with small number of hashes (2) but
>> the error is still happens. And it happens when I call similarity join.
>> After transformation, the size of  dataset is about 4G.
>>
>> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>>
>> What other params are you using for the lsh transformer?
>>
>> Are the issues occurring during transform or during the similarity join?
>>
>>
>> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
>> wrote:
>>
>> hi Das,
>> In general, I will apply them to larger datasets, so I want to use LSH,
>> which is more scaleable than the approaches as you suggested. Have you
>> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
>> parameters/configuration to make it work ?
>> Thanks.
>>
>> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>>
>> If it is 7m rows and 700k features (or say 1m features) brute force row
>> similarity will run fine as well...check out spark-4823...you can compare
>> quality with approximate variant...
>> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>>
>> Hi everyone,
>> Since spark 2.1.0 introduces LSH (http://spark.apache.org/docs/
>> latest/ml-features.html#locality-sensitive-hashing), we want to use LSH
>> to find approximately nearest neighbors. Basically, We have dataset with
>> about 7M rows. we want to use cosine distance to meassure the similarity
>> between items, so we use *RandomSignProjectionLSH* (
>> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db) instead
>> of *BucketedRandomProjectionLSH*. I try to tune some configurations such
>> as serialization, memory fraction, executor memory (~6G), number of
>> executors ( ~20), memory overhead ..., but nothing works. I often get error
>> "java.lang.OutOfMemoryError: Java heap space" while running. I know that
>> this implementation is done by engineer at Uber but I don't know right
>> configurations,.. to run the algorithm at scale. Do they need very big
>> memory to run it?
>>
>> Any help would be appreciated.
>> Thanks
>>
>>
>>
>>
>>
>>
>>


Re: Spark executors in streaming app always uses 2 executors

2017-02-22 Thread Jon Gregg
Spark offers a receiver-based approach or direct approach with Kafka (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html),
and a note in the receiver-based approach says "topic partitions in Kafka
does correlate to partitions of RDDs generated in Spark Streaming."

A fix might be as simple as switching to the direct approach

?

Jon Gregg

On Wed, Feb 22, 2017 at 12:37 AM, satishl  wrote:

> I am reading from a kafka topic which has 8 partitions. My spark app is
> given
> 40 executors (1 core per executor). After reading the data, I repartition
> the dstream by 500, map it and save it to cassandra.
> However, I see that only 2 executors are being used per batch. even though
> I
> see 500 tasks for the stage all of them are sequentially scheduled on the 2
> executors picked. My spark concepts are still forming and I missing
> something obvious.
> I expected that 8 executors will be picked for reading data from the 8
> partitions in kafka, and then with the repartition this data will be
> distributed between 40 executors and then saved to cassandra.
> How should I think about this?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-executors-in-streaming-app-
> always-uses-2-executors-tp28413.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark SQL : Join operation failure

2017-02-22 Thread Yong Zhang
Your error message is not clear about what really happens.


Is your container killed by Yarn, or it indeed runs OOM?

When I run the spark job with big data, here is normally what I will do:

1) Enable GC output. You need to monitor the GC output in the executor, to 
understand the GC pressure. If you see the feq full GC, you know your job is in 
danger.
2) Monitor the statistics of tasks in feq full GC executor. How many records 
are processing so far, what is the spill read/write bytes. Is the OOM only 
happening in one task with much higher statistics than the rest? This normally 
means data skew. If lots of task all have GC pressure, then your setting is 
just not enough for job.
3) In your case, you first want to know what kind of join Spark is using for 
your outer join. Does it make sense for your data? Wrong join way could lead to 
wrong way to do the job.

Yong


From: jatinpreet 
Sent: Wednesday, February 22, 2017 1:11 AM
To: user@spark.apache.org
Subject: Spark SQL : Join operation failure

Hi,

I am having a hard time running outer join operation on two parquet
datasets. The dataset size is large ~500GB with a lot of culumns in tune of
1000.

As per YARN administer imposed limits in the queue, I can have a total of 20
vcores and 8GB memory per executor.

I specified meory overhead and increased number of shuffle partitions to no
avail. This is how I submitted the job with pyspark,

spark-submit --master yarn-cluster --executor-memory 5500m --num-executors
19 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2000 --conf
spark.sql.shuffle.partitions=2048 --driver-memory 7g --queue
./

The relevant code is,

cm_go.registerTempTable("x")
ko.registerTempTable("y")
joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2")
joined_df.write.save("/user/data/output")


I am getting errors like these:

ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
Reason: Container marked as failed:
container_e36_1487531133522_0058_01_06 on host: dn2.bigdatalab.org. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_e36_1487531133522_0058_01_06
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
at org.apache.hadoop.util.Shell.run(Shell.java:844)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:225)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 52

--

FetchFailed(null, shuffleId=0, mapId=-1, reduceId=508, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at

[ANNOUNCE] Apache Bahir 2.1.0 Released

2017-02-22 Thread Christian Kadner
The Apache Bahir community is pleased to announce the release
of Apache Bahir 2.1.0 which provides the following extensions for
Apache Spark 2.1.0:

   - Akka Streaming
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

http://bahir.apache.org

The Apache Bahir streaming connectors are also available at:

https://spark-packages.org/?q=bahir

---
Best regards,
Christian Kadner