Option Encoder

2016-06-23 Thread Richard Marscher
Is there a proper way to make or get an Encoder for Option in Spark 2.0?
There isn't one by default and while ExpressionEncoder from catalyst will
work, it is private and unsupported.

-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Dataset - reduceByKey

2016-06-07 Thread Richard Marscher
There certainly are some gaps between the richness of the RDD API and the
Dataset API. I'm also migrating from RDD to Dataset and ran into
reduceByKey and join scenarios.

In the spark-dev list, one person was discussing reduceByKey being
sub-optimal at the moment and it spawned this JIRA
https://issues.apache.org/jira/browse/SPARK-15598. But you might be able to
get by with groupBy().reduce() for now, check performance though.

As for join, the approach would be using the joinWith function on Dataset.
Although the API isn't as sugary as it was for RDD IMO, something which
I've been discussing in a separate thread as well. I can't find a weblink
for it but the thread subject is "Dataset Outer Join vs RDD Outer Join".

On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> It would also be nice if there was a better example of joining two
> Datasets. I am looking at the documentation here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
> a little bit sparse - is there a better documentation source?
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I am looking at the option of moving RDD based operations to Dataset
>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>> What would the equivalent be in the Dataset interface - I do not see a
>> simple reduceByKey replacement.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>


-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Dataset Outer Join vs RDD Outer Join

2016-06-07 Thread Richard Marscher
For anyone following along the chain went private for a bit, but there were
still issues with the bytecode generation in the 2.0-preview so this JIRA
was created: https://issues.apache.org/jira/browse/SPARK-15786

On Mon, Jun 6, 2016 at 1:11 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> That kind of stuff is likely fixed in 2.0.  If you can get a reproduction
> working there it would be very helpful if you could open a JIRA.
>
> On Mon, Jun 6, 2016 at 7:37 AM, Richard Marscher <rmarsc...@localytics.com
> > wrote:
>
>> A quick unit test attempt didn't get far replacing map with as[], I'm
>> only working against 1.6.1 at the moment though, I was going to try 2.0 but
>> I'm having a hard time building a working spark-sql jar from source, the
>> only ones I've managed to make are intended for the full assembly fat jar.
>>
>>
>> Example of the error from calling joinWith as left_outer and then
>> .as[(Option[T], U]) where T and U are Int and Int.
>>
>> [info] newinstance(class scala.Tuple2,decodeusingserializer(input[0,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class
>> scala.Tuple2),None)
>> [info] :- decodeusingserializer(input[0,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true)
>> [info] :  +- input[0, StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))]
>> [info] +- decodeusingserializer(input[1,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true)
>> [info]+- input[1, StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))]
>>
>> Cause: java.util.concurrent.ExecutionException: java.lang.Exception:
>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 32, Column 60: No applicable constructor/method
>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>> candidates are: "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>>
>> The generated code is passing InternalRow objects into the ByteBuffer
>>
>> Starting from two Datasets of types Dataset[(Int, Int)] with expression
>> $"left._1" === $"right._1". I'll have to spend some time getting a better
>> understanding of this analysis phase, but hopefully I can come up with
>> something.
>>
>> On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> Option should place nicely with encoders, but its always possible there
>>> are bugs.  I think those function signatures are slightly more expensive
>>> (one extra object allocation) and its not as java friendly so we probably
>>> don't want them to be the default.
>>>
>>> That said, I would like to enable that kind of sugar while still taking
>>> advantage of all the optimizations going on under the covers.  Can you get
>>> it to work if you use `as[...]` instead of `map`?
>>>
>>> On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher <
>>> rmarsc...@localytics.com> wrote:
>>>
>>>> Ah thanks, I missed seeing the PR for
>>>> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became
>>>> null objects then I can implement methods that will map those back to
>>>> results that align closer to the RDD interface.
>>>>
>>>> As a follow on, I'm curious about thoughts regarding enriching the
>>>> Dataset join interface versus a package or users sugaring for themselves. I
>>>> haven't considered the implications of what the optimizations datasets,
>>>> tungsten, and/or bytecode gen can do now regarding joins so I may be
>>>> missing a critical benefit there around say avoiding Options in favor of
>>>> nulls. If nothing else, I guess Option doesn't have a first class Encoder
>>>> or DataType yet and maybe for good reasons.
>>>>
>>>> I did find the RDD join interface elegant, though. In the ideal world
>>>> an API comparable the following would be nice:
>>>> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06
>>>>
>>>>
>>>> On Wed, Jun 1,

Re: Dataset Outer Join vs RDD Outer Join

2016-06-06 Thread Richard Marscher
A quick unit test attempt didn't get far replacing map with as[], I'm only
working against 1.6.1 at the moment though, I was going to try 2.0 but I'm
having a hard time building a working spark-sql jar from source, the only
ones I've managed to make are intended for the full assembly fat jar.


Example of the error from calling joinWith as left_outer and then
.as[(Option[T], U]) where T and U are Int and Int.

[info] newinstance(class scala.Tuple2,decodeusingserializer(input[0,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class
scala.Tuple2),None)
[info] :- decodeusingserializer(input[0,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true)
[info] :  +- input[0, StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))]
[info] +- decodeusingserializer(input[1,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true)
[info]+- input[1, StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))]

Cause: java.util.concurrent.ExecutionException: java.lang.Exception: failed
to compile: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 32, Column 60: No applicable constructor/method
found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
candidates are: "public static java.nio.ByteBuffer
java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
java.nio.ByteBuffer.wrap(byte[], int, int)"

The generated code is passing InternalRow objects into the ByteBuffer

Starting from two Datasets of types Dataset[(Int, Int)] with expression
$"left._1" === $"right._1". I'll have to spend some time getting a better
understanding of this analysis phase, but hopefully I can come up with
something.

On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Option should place nicely with encoders, but its always possible there
> are bugs.  I think those function signatures are slightly more expensive
> (one extra object allocation) and its not as java friendly so we probably
> don't want them to be the default.
>
> That said, I would like to enable that kind of sugar while still taking
> advantage of all the optimizations going on under the covers.  Can you get
> it to work if you use `as[...]` instead of `map`?
>
> On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher <
> rmarsc...@localytics.com> wrote:
>
>> Ah thanks, I missed seeing the PR for
>> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became
>> null objects then I can implement methods that will map those back to
>> results that align closer to the RDD interface.
>>
>> As a follow on, I'm curious about thoughts regarding enriching the
>> Dataset join interface versus a package or users sugaring for themselves. I
>> haven't considered the implications of what the optimizations datasets,
>> tungsten, and/or bytecode gen can do now regarding joins so I may be
>> missing a critical benefit there around say avoiding Options in favor of
>> nulls. If nothing else, I guess Option doesn't have a first class Encoder
>> or DataType yet and maybe for good reasons.
>>
>> I did find the RDD join interface elegant, though. In the ideal world an
>> API comparable the following would be nice:
>> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06
>>
>>
>> On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> Thanks for the feedback.  I think this will address at least some of the
>>> problems you are describing: https://github.com/apache/spark/pull/13425
>>>
>>> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher <
>>> rmarsc...@localytics.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've been working on transitioning from RDD to Datasets in our codebase
>>>> in anticipation of being able to leverage features of 2.0.
>>>>
>>>> I'm having a lot of difficulties with the impedance mismatches between
>>>> how outer joins worked with RDD versus Dataset. The Dataset joins feel like
>>>> a big step backwards IMO. With RDD, leftOuterJoin would give you Option
>>>> types of the results from the right side of the join. This follows
>>>> idiomatic Scala avoiding nulls and was easy to work with.
>>>>
>>>> Now with Dataset there is only joinWith where you specify the join
>>>> type, but it l

Re: Dataset Outer Join vs RDD Outer Join

2016-06-01 Thread Richard Marscher
Ah thanks, I missed seeing the PR for
https://issues.apache.org/jira/browse/SPARK-15441. If the rows became null
objects then I can implement methods that will map those back to results
that align closer to the RDD interface.

As a follow on, I'm curious about thoughts regarding enriching the Dataset
join interface versus a package or users sugaring for themselves. I haven't
considered the implications of what the optimizations datasets, tungsten,
and/or bytecode gen can do now regarding joins so I may be missing a
critical benefit there around say avoiding Options in favor of nulls. If
nothing else, I guess Option doesn't have a first class Encoder or DataType
yet and maybe for good reasons.

I did find the RDD join interface elegant, though. In the ideal world an
API comparable the following would be nice:
https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06


On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Thanks for the feedback.  I think this will address at least some of the
> problems you are describing: https://github.com/apache/spark/pull/13425
>
> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher <rmarsc...@localytics.com
> > wrote:
>
>> Hi,
>>
>> I've been working on transitioning from RDD to Datasets in our codebase
>> in anticipation of being able to leverage features of 2.0.
>>
>> I'm having a lot of difficulties with the impedance mismatches between
>> how outer joins worked with RDD versus Dataset. The Dataset joins feel like
>> a big step backwards IMO. With RDD, leftOuterJoin would give you Option
>> types of the results from the right side of the join. This follows
>> idiomatic Scala avoiding nulls and was easy to work with.
>>
>> Now with Dataset there is only joinWith where you specify the join type,
>> but it lost all the semantics of identifying missing data from outer joins.
>> I can write some enriched methods on Dataset with an implicit class to
>> abstract messiness away if Dataset nulled out all mismatching data from an
>> outer join, however the problem goes even further in that the values aren't
>> always null. Integer, for example, defaults to -1 instead of null. Now it's
>> completely ambiguous what data in the join was actually there versus
>> populated via this atypical semantic.
>>
>> Are there additional options available to work around this issue? I can
>> convert to RDD and back to Dataset but that's less than ideal.
>>
>> Thanks,
>> --
>> *Richard Marscher*
>> Senior Software Engineer
>> Localytics
>> Localytics.com <http://localytics.com/> | Our Blog
>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
>> Facebook <http://facebook.com/localytics> | LinkedIn
>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>
>
>


-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Dataset Outer Join vs RDD Outer Join

2016-06-01 Thread Richard Marscher
Hi,

I've been working on transitioning from RDD to Datasets in our codebase in
anticipation of being able to leverage features of 2.0.

I'm having a lot of difficulties with the impedance mismatches between how
outer joins worked with RDD versus Dataset. The Dataset joins feel like a
big step backwards IMO. With RDD, leftOuterJoin would give you Option types
of the results from the right side of the join. This follows idiomatic
Scala avoiding nulls and was easy to work with.

Now with Dataset there is only joinWith where you specify the join type,
but it lost all the semantics of identifying missing data from outer joins.
I can write some enriched methods on Dataset with an implicit class to
abstract messiness away if Dataset nulled out all mismatching data from an
outer join, however the problem goes even further in that the values aren't
always null. Integer, for example, defaults to -1 instead of null. Now it's
completely ambiguous what data in the join was actually there versus
populated via this atypical semantic.

Are there additional options available to work around this issue? I can
convert to RDD and back to Dataset but that's less than ideal.

Thanks,
-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local Mode: Executor thread leak?

2015-12-08 Thread Richard Marscher
Alright I was able to work through the problem.

So the owning thread was one from the executor task launch worker, which at
least in local mode runs the task and the related user code of the task.
After judiciously naming every thread in the pools in the user code (with a
custom ThreadFactory) I was able to trace down the leak to a couple thread
pools that were not shut down properly by noticing the named threads
accumulating in thread dumps of the JVM process.

On Mon, Dec 7, 2015 at 6:41 PM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> Thanks for the response.
>
> The version is Spark 1.5.2.
>
> Some examples of the thread names:
>
> pool-1061-thread-1
> pool-1059-thread-1
> pool-1638-thread-1
>
> There become hundreds then thousands of these stranded in WAITING.
>
> I added logging to try to track the lifecycle of the thread pool in
> Executor as mentioned before. Here is an excerpt, but every seems fine
> there. Every executor that starts is also shut down and it seems like it
> shuts down fine.
>
> 15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor
> driver. pool shut down 
> java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
> 15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread
> pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool size
> = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
> 15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor
> driver. pool shut down 
> java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36]
> 15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread
> pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool size
> = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
> 15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor
> driver. pool shut down 
> java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288]
>
> Also here is an example thread dump of such a thread:
>
> "pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on
> condition [0x7f0c33c3e000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x7f10b3e8fb60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>
>> Which version are you using? Could you post these thread names here?
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:
>>
>>> Hi,
>>>
>>> I've been running benchmarks against Spark in local mode in a long
>>> running process. I'm seeing threads leaking each time it runs a job. It
>>> doesn't matter if I recycle SparkContext constantly or have 1 context stay
>>> alive for the entire application lifetime.
>>>
>>> I see a huge accumulation ongoing of "pool--thread-1" threads with
>>> the creating thread "Executor task launch worker-xx" where x's are numbers.
>>> The number of leaks per launch worker varies but usually 1 to a few.
>>>
>>> Searching the Spark code the pool is created in the Executor class. It
>>> is `.shutdown()` in the stop for the executor. I've wired up logging and
>>> also tried shutdownNow() and awaitForTermination on the pools. Every seems
>>> okay there for every Executor that is called with `stop()` but I'm still
>>> not sure yet if every Executor is called as such, which I am looking into
>>> now.
>>>
>>> What I'm curious to know is if anyone has seen a similar issue?
>>>
>>> --
>>> *Richard Marscher*
>>> Software Eng

Local Mode: Executor thread leak?

2015-12-07 Thread Richard Marscher
Hi,

I've been running benchmarks against Spark in local mode in a long running
process. I'm seeing threads leaking each time it runs a job. It doesn't
matter if I recycle SparkContext constantly or have 1 context stay alive
for the entire application lifetime.

I see a huge accumulation ongoing of "pool--thread-1" threads with the
creating thread "Executor task launch worker-xx" where x's are numbers. The
number of leaks per launch worker varies but usually 1 to a few.

Searching the Spark code the pool is created in the Executor class. It is
`.shutdown()` in the stop for the executor. I've wired up logging and also
tried shutdownNow() and awaitForTermination on the pools. Every seems okay
there for every Executor that is called with `stop()` but I'm still not
sure yet if every Executor is called as such, which I am looking into now.

What I'm curious to know is if anyone has seen a similar issue?

-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local Mode: Executor thread leak?

2015-12-07 Thread Richard Marscher
Thanks for the response.

The version is Spark 1.5.2.

Some examples of the thread names:

pool-1061-thread-1
pool-1059-thread-1
pool-1638-thread-1

There become hundreds then thousands of these stranded in WAITING.

I added logging to try to track the lifecycle of the thread pool in
Executor as mentioned before. Here is an excerpt, but every seems fine
there. Every executor that starts is also shut down and it seems like it
shuts down fine.

15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor
driver. pool shut down
java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread
pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool size =
0, active threads = 0, queued tasks = 0, completed tasks = 0]
15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor
driver. pool shut down
java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36]
15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread
pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool size =
0, active threads = 0, queued tasks = 0, completed tasks = 0]
15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor
driver. pool shut down
java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288]

Also here is an example thread dump of such a thread:

"pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on
condition [0x7f0c33c3e000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x7f10b3e8fb60> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:

> Which version are you using? Could you post these thread names here?
>
> Best Regards,
> Shixiong Zhu
>
> 2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:
>
>> Hi,
>>
>> I've been running benchmarks against Spark in local mode in a long
>> running process. I'm seeing threads leaking each time it runs a job. It
>> doesn't matter if I recycle SparkContext constantly or have 1 context stay
>> alive for the entire application lifetime.
>>
>> I see a huge accumulation ongoing of "pool--thread-1" threads with
>> the creating thread "Executor task launch worker-xx" where x's are numbers.
>> The number of leaks per launch worker varies but usually 1 to a few.
>>
>> Searching the Spark code the pool is created in the Executor class. It is
>> `.shutdown()` in the stop for the executor. I've wired up logging and also
>> tried shutdownNow() and awaitForTermination on the pools. Every seems okay
>> there for every Executor that is called with `stop()` but I'm still not
>> sure yet if every Executor is called as such, which I am looking into now.
>>
>> What I'm curious to know is if anyone has seen a similar issue?
>>
>> --
>> *Richard Marscher*
>> Software Engineer
>> Localytics
>> Localytics.com <http://localytics.com/> | Our Blog
>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
>> Facebook <http://facebook.com/localytics> | LinkedIn
>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Local mode: Stages hang for minutes

2015-12-03 Thread Richard Marscher
Hi,

I'm doing some testing of workloads using local mode on a server. I see
weird behavior where a job is submitted to the application and it just
hangs for several minutes doing nothing. The stages are submitted as
pending and in the application UI the stage view claims no tasks have been
submitted. Suddenly after a few minutes things suddenly start and run
smoothly.

I'm running against tiny data sets the size of 10s to low 100s of items in
the RDD. I've been attaching with JProfiler, doing thread and heap dumps
but nothing is really standing out as to why Spark seems to periodically
pause for such a long time.

Has anyone else seen similar behavior or aware of some quirk of local mode
that could cause this kind of blocking?

-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local mode: Stages hang for minutes

2015-12-03 Thread Richard Marscher
I should add that the pauses are not from GC and also in tracing the CPU
call tree in the JVM it seems like nothing is doing any work, just seems to
be idling or blocking.

On Thu, Dec 3, 2015 at 11:24 AM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> Hi,
>
> I'm doing some testing of workloads using local mode on a server. I see
> weird behavior where a job is submitted to the application and it just
> hangs for several minutes doing nothing. The stages are submitted as
> pending and in the application UI the stage view claims no tasks have been
> submitted. Suddenly after a few minutes things suddenly start and run
> smoothly.
>
> I'm running against tiny data sets the size of 10s to low 100s of items in
> the RDD. I've been attaching with JProfiler, doing thread and heap dumps
> but nothing is really standing out as to why Spark seems to periodically
> pause for such a long time.
>
> Has anyone else seen similar behavior or aware of some quirk of local mode
> that could cause this kind of blocking?
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local mode: Stages hang for minutes

2015-12-03 Thread Richard Marscher
Ended up realizing I was only looking at the call tree for running threads.
After looking at blocking threads I saw that it was spending hundreds of
compute hours blocking on jets3t calls to S3. Realized it was looking over
likely thousands if not hundreds of thousands of S3 files accumulated over
many rounds of load testing. Cleaning the files fixed the issue and I'm
pretty sure it's already well known that the underlying s3n doesn't handle
traversing a large s3 file tree with the sparkContext.textFile function
using wildcard well.

On Thu, Dec 3, 2015 at 12:57 PM, Ali Tajeldin EDU <alitedu1...@gmail.com>
wrote:

> You can try to run "jstack" a couple of times while the app is hung to
> look for patterns  for where the app is hung.
> --
> Ali
>
>
> On Dec 3, 2015, at 8:27 AM, Richard Marscher <rmarsc...@localytics.com>
> wrote:
>
> I should add that the pauses are not from GC and also in tracing the CPU
> call tree in the JVM it seems like nothing is doing any work, just seems to
> be idling or blocking.
>
> On Thu, Dec 3, 2015 at 11:24 AM, Richard Marscher <
> rmarsc...@localytics.com> wrote:
>
>> Hi,
>>
>> I'm doing some testing of workloads using local mode on a server. I see
>> weird behavior where a job is submitted to the application and it just
>> hangs for several minutes doing nothing. The stages are submitted as
>> pending and in the application UI the stage view claims no tasks have been
>> submitted. Suddenly after a few minutes things suddenly start and run
>> smoothly.
>>
>> I'm running against tiny data sets the size of 10s to low 100s of items
>> in the RDD. I've been attaching with JProfiler, doing thread and heap dumps
>> but nothing is really standing out as to why Spark seems to periodically
>> pause for such a long time.
>>
>> Has anyone else seen similar behavior or aware of some quirk of local
>> mode that could cause this kind of blocking?
>>
>> --
>> *Richard Marscher*
>> Software Engineer
>> Localytics
>> Localytics.com <http://localytics.com/> | Our Blog
>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
>> Facebook <http://facebook.com/localytics> | LinkedIn
>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>
>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: New to Spark - Paritioning Question

2015-09-09 Thread Richard Marscher
Ah I see. In that case, the groupByKey function does guarantee every key is
on exactly one partition matched with the aggregated data. This can be
improved depending on what you want to do after. Group by key only
aggregates the data after shipping it across the cluster. Meanwhile, using
reduceByKey will do aggregation on each node first, then ship those results
to the final node and partition to finalize the aggregation there. If that
makes sense.

So say Node 1 has pairs: (a, 1), (b, 2), (b, 6)
Node 2 has pairs: (a, 2), (a,3), (b, 4)

group by would say send both a pair and b pairs across the network. If you
did reduce with the aggregate of sum then you'd expect it to ship (b, 8)
from Node 1 or (a, 5) from Node 2 since it did the local aggregation first.

You are correct that doing something with expensive side-effects like
writing to a database (connections and network + I/O) is best done with the
mapPartitions or foreachPartition type of functions on RDD so you can share
a database connection and also potentially do things like batch statements.


On Tue, Sep 8, 2015 at 7:37 PM, Mike Wright <mwri...@snl.com> wrote:

> Thanks for the response!
>
> Well, in retrospect each partition doesn't need to be restricted to a
> single key. But, I cannot have values associated with a key span partitions
> since they all need to be processed together for a key to facilitate
> cumulative calcs. So provided an individual key has all its values in a
> single partition, I'm OK.
>
> Additionally, the values will be written to the database, and from what I
> have read doing this at the partition level is the best compromise between
> 1) Writing the calculated values for each key (lots of connect/disconnects)
> and collecting them all at the end and writing them all at once.
>
> I am using a groupBy against the filtered RDD the get the grouping I want,
> but apparently this may not be the most efficient way, and it seems that
> everything is always in a single partition under this scenario.
>
>
> ___
>
> *Mike Wright*
> Principal Architect, Software Engineering
>
> SNL Financial LC
> 434-951-7816 *p*
> 434-244-4466 *f*
> 540-470-0119 *m*
>
> mwri...@snl.com
>
> On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher <rmarsc...@localytics.com
> > wrote:
>
>> That seems like it could work, although I don't think `partitionByKey` is
>> a thing, at least for RDD. You might be able to merge step #2 and step #3
>> into one step by using the `reduceByKey` function signature that takes in a
>> Partitioner implementation.
>>
>> def reduceByKey(partitioner: Partitioner
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html>
>> , func: (V, V) ⇒ V): RDD
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>
>> [(K, V)]
>>
>> Merge the values for each key using an associative reduce function. This
>> will also perform the merging locally on each mapper before sending results
>> to a reducer, similarly to a "combiner" in MapReduce.
>>
>> The tricky part might be getting the partitioner to know about the number
>> of partitions, which I think it needs to know upfront in `abstract def
>> numPartitions: Int`. The `HashPartitioner` for example takes in the
>> number as a constructor argument, maybe you could use that with an upper
>> bound size if you don't mind empty partitions. Otherwise you might have to
>> mess around to extract the exact number of keys if it's not readily
>> available.
>>
>> Aside: what is the requirement to have each partition only contain the
>> data related to one key?
>>
>> On Fri, Sep 4, 2015 at 11:06 AM, mmike87 <mwri...@snl.com> wrote:
>>
>>> Hello, I am new to Apache Spark and this is my company's first Spark
>>> project.
>>> Essentially, we are calculating models dealing with Mining data using
>>> Spark.
>>>
>>> I am holding all the source data in a persisted RDD that we will refresh
>>> periodically. When a "scenario" is passed to the Spark job (we're using
>>> Job
>>> Server) the persisted RDD is filtered to the relevant mines. For
>>> example, we
>>> may want all mines in Chile and the 1990-2015 data for each.
>>>
>>> Many of the calculations are cumulative, that is when we apply user-input
>>> "adjustment factors" to a value, we also need the "flexed" value we
>>> calculated for that mine previously.
>>>
>>> To ensure that this works, the idea if to:
>>>
>>> 1) Filter the superset to relevant mines (done)
>>> 2) Group the subset by the 

Re: What should be the optimal value for spark.sql.shuffle.partition?

2015-09-09 Thread Richard Marscher
I see you reposted with more details:

"I have 2 TB of
skewed data to process and then convert rdd into dataframe and use it as
table in hiveContext.sql(). I am using 60 executors with 20 GB memory and 4
cores."

If I'm reading that correctly, you have 2TB of data and 1.2TB of memory in
the cluster. I think that's a fundamental problem up front. If it's skewed
then that will be even worse for doing aggregation. I think to start the
data either needs to be broken down or the cluster upgraded unfortunately.

On Wed, Sep 9, 2015 at 5:41 PM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> Do you have any details about the cluster you are running this against?
> The memory per executor/node, number of executors, and such? Even at a
> shuffle setting of 1000 that would be roughly 1GB per partition assuming
> the 1TB of data includes overheads in the JVM. Maybe try another order of
> magnitude higher for number of shuffle partitions and see where that gets
> you?
>
> On Tue, Sep 1, 2015 at 12:11 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>
>> Hi I am using Spark SQL actually hiveContext.sql() which uses group by
>> queries and I am running into OOM issues. So thinking of increasing value
>> of
>> spark.sql.shuffle.partition from 200 default to 1000 but it is not
>> helping.
>> Please correct me if I am wrong this partitions will share data shuffle
>> load
>> so more the partitions less data to hold. Please guide I am new to Spark.
>> I
>> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process
>> using hiveContext.sql() group by queries.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: What should be the optimal value for spark.sql.shuffle.partition?

2015-09-09 Thread Richard Marscher
Do you have any details about the cluster you are running this against? The
memory per executor/node, number of executors, and such? Even at a shuffle
setting of 1000 that would be roughly 1GB per partition assuming the 1TB of
data includes overheads in the JVM. Maybe try another order of magnitude
higher for number of shuffle partitions and see where that gets you?

On Tue, Sep 1, 2015 at 12:11 PM, unk1102 <umesh.ka...@gmail.com> wrote:

> Hi I am using Spark SQL actually hiveContext.sql() which uses group by
> queries and I am running into OOM issues. So thinking of increasing value
> of
> spark.sql.shuffle.partition from 200 default to 1000 but it is not helping.
> Please correct me if I am wrong this partitions will share data shuffle
> load
> so more the partitions less data to hold. Please guide I am new to Spark. I
> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process
> using hiveContext.sql() group by queries.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: spark.shuffle.spill=false ignored?

2015-09-09 Thread Richard Marscher
Hi Eric,

I just wanted to do a sanity check, do you know what paths it is trying to
write to? I ask because even without spilling, shuffles always write to
disk first before transferring data across the network. I had at one point
encountered this myself where we accidentally had /tmp mounted on a tiny
disk and kept running out of disk on shuffles even though we also don't
spill. You may have already considered or ruled this out though.

On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker <eric.wal...@gmail.com> wrote:

> Hi,
>
> I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to
> run a large pyspark job several times, specifying
> `spark.shuffle.spill=false` in different ways.  It seems that the setting
> is ignored, at least partially, and some of the tasks start spilling large
> amounts of data to disk.  The job has been fast enough in the past, but
> once it starts spilling to disk it lands on Miller's planet [1].
>
> Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
> could there be an incompatible setting that is overriding
> `spark.shuffle.spill=false`?  Is it something that goes back to Spark
> 1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
> to continue on for a while, I've started to see Kryo stack traces in the
> tasks that are spilling to disk.  The stack traces mention there not being
> enough disk space, although a `df` shows plenty of space (perhaps after the
> fact, when temporary files have been cleaned up).
>
> Has anyone run into something like this before?  I would be happy to see
> OOM errors, because that would be consistent with one understanding of what
> might be going on, but I haven't yet.
>
> Eric
>
>
> [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-08 Thread Richard Marscher
Hi,

what is the reasoning behind the use of `coalesce(1,false)`? This is saying
to aggregate all data into a single partition, which must fit in memory on
one node in the Spark cluster. If the cluster has more than one node it
must shuffle to move the data. It doesn't seem like the following map or
union necessitate coalesce, but the use case is not clear to me.

On Fri, Sep 4, 2015 at 12:29 PM, unk1102 <umesh.ka...@gmail.com> wrote:

> Hi I have Spark job which does some processing on ORC data and stores back
> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have
> the following piece of code which is using heavy shuffle memory. How do I
> optimize below code? Is there anything wrong with it? It is working fine as
> expected only causing slowness because of GC pause and shuffles lots of
> data
> so hitting memory issues. Please guide I am new to Spark. Thanks in
> advance.
>
> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
> false).map(new Function<Row, Row>() {
>@Override
>public Row call(Row row) throws Exception {
> List rowAsList;
> Row row1 = null;
> if (row != null) {
>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>   row1 = RowFactory.create(rowAsList.toArray());
> }
> return row1;
>}
> }).union(modifiedRDD);
> DataFrame updatedDataFrame =
> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>
> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
> "date").save("baseTable");
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: New to Spark - Paritioning Question

2015-09-08 Thread Richard Marscher
That seems like it could work, although I don't think `partitionByKey` is a
thing, at least for RDD. You might be able to merge step #2 and step #3
into one step by using the `reduceByKey` function signature that takes in a
Partitioner implementation.

def reduceByKey(partitioner: Partitioner
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html>
, func: (V, V) ⇒ V): RDD
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>
[(K, V)]

Merge the values for each key using an associative reduce function. This
will also perform the merging locally on each mapper before sending results
to a reducer, similarly to a "combiner" in MapReduce.

The tricky part might be getting the partitioner to know about the number
of partitions, which I think it needs to know upfront in `abstract def
numPartitions: Int`. The `HashPartitioner` for example takes in the number
as a constructor argument, maybe you could use that with an upper bound
size if you don't mind empty partitions. Otherwise you might have to mess
around to extract the exact number of keys if it's not readily available.

Aside: what is the requirement to have each partition only contain the data
related to one key?

On Fri, Sep 4, 2015 at 11:06 AM, mmike87 <mwri...@snl.com> wrote:

> Hello, I am new to Apache Spark and this is my company's first Spark
> project.
> Essentially, we are calculating models dealing with Mining data using
> Spark.
>
> I am holding all the source data in a persisted RDD that we will refresh
> periodically. When a "scenario" is passed to the Spark job (we're using Job
> Server) the persisted RDD is filtered to the relevant mines. For example,
> we
> may want all mines in Chile and the 1990-2015 data for each.
>
> Many of the calculations are cumulative, that is when we apply user-input
> "adjustment factors" to a value, we also need the "flexed" value we
> calculated for that mine previously.
>
> To ensure that this works, the idea if to:
>
> 1) Filter the superset to relevant mines (done)
> 2) Group the subset by the unique identifier for the mine. So, a group may
> be all the rows for mine "A" for 1990-2015
> 3) I then want to ensure that the RDD is partitioned by the Mine Identifier
> (and Integer).
>
> It's step 3 that is confusing me. I suspect it's very easy ... do I simply
> use PartitionByKey?
>
> We're using Java if that makes any difference.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Richard Marscher
Not that I'm aware of. We ran into the similar issue where we didn't want
to keep accumulating all these empty part files in storage on S3 or HDFS.
There didn't seem to be any performance free way to do it with an RDD, so
we just run a non-spark post-batch operation to delete empty files from the
write path.

On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin patanac...@ipsy.com
wrote:

 Currently, I use rdd.isEmpty()

 Thanks,
 Patanachai



 On 08/06/2015 12:02 PM, gpatcham wrote:

 Is there a way to filter out empty partitions before I write to HDFS other
 than using reparition and colasce ?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 --
 Patanachai



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Richard Marscher
Yes it does, in fact it's probably going to be one of the more expensive
shuffles you could trigger.

On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu rotationsymmetr...@gmail.com
wrote:

 Does RDD.cartesian involve shuffling?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Repartition question

2015-08-04 Thread Richard Marscher
Hi,

it is possible to control the number of partitions for the RDD without
calling repartition by setting the max split size for the hadoop input
format used. Tracing through the code, XmlInputFormat extends
FileInputFormat which determines the number of splits (which NewHadoopRdd
uses to determine number of partitions:
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L95)
with a few configs:
https://github.com/apache/hadoop/blob/branch-2.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L200
.

public static final String SPLIT_MAXSIZE =


 mapreduce.input.fileinputformat.split.maxsize;


 public static final String SPLIT_MINSIZE =
mapreduce.input.fileinputformat.split.minsize;
If you are setting SparkConf fields, prefix the keys with spark.hadoop and
they will end up on the Hadoop conf used for the above values.

On Tue, Aug 4, 2015 at 12:31 AM, Naveen Madhire vmadh...@umail.iu.edu
wrote:

 Hi All,

 I am running the WikiPedia parsing example present in the Advance
 Analytics with Spark book.


 https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112


 The partitions of the RDD returned by the readFile function (mentioned
 above) is of 32MB size. So if my file size is 100 MB, RDD is getting
 created with 4 partitions with approx 32MB  size.


 I am running this in a standalone spark cluster mode, every thing is
 working fine only little confused about the nbr of partitions and the size.

 I want to increase the nbr of partitions for the RDD to make use of the
 cluster. Is calling repartition() after this the only option or can I pass
 something in the above method to have more partitions of the RDD.

 Please let me know.

 Thanks.




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Richard Marscher
That is the only alternative I'm aware of, if either A or B are small
enough to broadcast then you'd at least be done cartesian products all
locally without needing to also transmit and shuffle A. Unless spark
somehow optimizes cartesian product and only transfers the smaller RDD
across the network in the shuffle but I don't have reason to believe that's
true.

I'd try the cartesian first if you haven't tried at all, just to make sure
it actually is too slow before getting tricky with the broadcast.

On Tue, Aug 4, 2015 at 12:25 PM, Meihua Wu rotationsymmetr...@gmail.com
wrote:

 Thanks, Richard!

 I basically have two RDD's: A and B; and I need to compute a value for
 every pair of (a, b) for a in A and b in B. My first thought is
 cartesian, but involves expensive shuffle.

 Any alternatives? How about I convert B to an array and broadcast it
 to every node (assuming B is relative small to fit)?



 On Tue, Aug 4, 2015 at 8:23 AM, Richard Marscher
 rmarsc...@localytics.com wrote:
  Yes it does, in fact it's probably going to be one of the more expensive
  shuffles you could trigger.
 
  On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu rotationsymmetr...@gmail.com
 
  wrote:
 
  Does RDD.cartesian involve shuffling?
 
  Thanks!
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
  --
  Richard Marscher
  Software Engineer
  Localytics
  Localytics.com | Our Blog | Twitter | Facebook | LinkedIn




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: How to increase parallelism of a Spark cluster?

2015-08-04 Thread Richard Marscher
) is to use
 DefaultHttpClient (from HttpComponents) whose settings are as follows:


- Version: HttpVersion.HTTP_1_1


- ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET


- NoTcpDelay: true


- SocketBufferSize: 8192


- UserAgent: Apache-HttpClient/release (java 1.5)

 In addition, the Solr code sets the following additional config
 parameters on the DefaultHttpClient.

   params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
   params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
   params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);

 Since all my connections are coming out of 2 worker boxes, it looks like
 I could get 32x2 = 64 clients hitting Solr, right?

 @Steve: Thanks for the link to the HttpClient config. I was thinking
 about using a thread pool (or better using a PoolingHttpClientManager per
 the docs), but it probably won't help since its still being fed one request
 at a time.
 @Abhishek: my observations agree with what you said. In the past I have
 had success with repartition to reduce the partition size especially when
 groupBy operations were involved. But I believe an executor should be able
 to handle multiple tasks in parallel from what I understand about Akka on
 which Spark is built - the worker is essentially an ActorSystem which can
 contain multiple Actors, each actor works on a queue of tasks. Within an
 Actor everything is sequential, but the ActorSystem is responsible for
 farming out tasks it gets to each of its Actors. Although it is possible I
 could be generalizing incorrectly from my limited experience with Akka.

 Thanks again for all your help. Please let me know if something jumps out
 and/or if there is some configuration I should check.

 -sujit



 On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 I don't know if (your assertion/expectation that) workers will process
 things (multiple partitions) in parallel is really valid. Or if having more
 partitions than workers will necessarily help (unless you are memory bound
 - so partitions is essentially helping your work size rather than execution
 parallelism).

 [Disclaimer: I am no authority on Spark, but wanted to throw my spin
 based my own understanding].

 Nothing official about it :)

 -abhishek-

 On Jul 31, 2015, at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hello,

 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

  .mapPartitions(keyValues = getResults(keyValues))


 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.

 -sujit






-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: user threads in executors

2015-07-21 Thread Richard Marscher
You can certainly create threads in a map transformation. We do this to do
concurrent DB lookups during one stage for example. I would recommend,
however, that you switch to mapPartitions from map as this allows you to
create a fixed size thread pool to share across items on a partition as
opposed to spawning a future per record in the RDD for example.

On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 Can I create user threads in executors.
 I have a streaming app where after processing I have a requirement to push
 events to external system . Each post request costs ~90-100 ms.

 To make post parllel, I can not use same thread because that is limited by
 no of cores available in system , can I useuser therads in spark App? I
 tried to create 2 thredas in a map tasks and it worked.

 Is there any upper limit on no of user threds in spark executor ? Is it a
 good idea to create user threads in spark map task?

 Thanks




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Apache Spark : Custom function for reduceByKey - missing arguments for method

2015-07-10 Thread Richard Marscher
Did you try it by adding the `_` after the method names to partially apply
them? Scala is saying that its trying to immediately apply those methods
but can't find arguments.  But you instead are trying to pass them along as
functions (which they aren't). Here is a link to a stackoverflow answer
that should help clarify: http://stackoverflow.com/a/19720808/72401. I
think there are two solutions, turn the getMax and getMin into functions by
using val ala:

val getMax: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b)
=
  if (a  b) a
  else b
}

val getMin: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b)
=
  if (a  b) a
  else b
}

or just partially apply them:

maxVector = attribMap.reduceByKey( getMax _)
minVector = attribMap.reduceByKey( getMin _)

On Thu, Jul 9, 2015 at 9:09 PM, ameyamm ameya.malond...@outlook.com wrote:

 I am trying to normalize a dataset (convert values for all attributes in
 the
 vector to 0-1 range). I created an RDD of tuple (attrib-name,
 attrib-value) for all the records in the dataset as follows:

 val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap(
   contact = {
 List(
   (dage,contact.dage match { case Some(value)
 = DoubleDimension(value) ; case None = null }),
   (dancstry1,contact.dancstry1 match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (dancstry2,contact.dancstry2 match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (ddepart,contact.ddepart match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (dhispanic,contact.dhispanic match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (dhour89,contact.dhour89 match { case
 Some(value) = DoubleDimension(value) ; case None = null })
 )
   }
 )

 Here, contactDataset is of the type RDD[Contact]. The fields of Contact
 class are of type Option[Long].

 DoubleDimension is a simple wrapper over Double datatype. It extends the
 Ordered trait and implements corresponding compare method and equals
 method.

 To obtain the max and min attribute vector for computing the normalized
 values,

 maxVector = attribMap.reduceByKey( getMax )
 minVector = attribMap.reduceByKey( getMin )

 Implementation of getMax and getMin is as follows:

 def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension =
 {
 if (a  b) a
 else b
 }

 def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = {
 if (a  b) a
 else b
 }

 I get a compile error at calls to the methods getMax and getMin stating:

 [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error:
 missing arguments for method getMax in class DatasetReader;

 [ERROR] follow this method with '_' if you want to treat it as a partially
 applied function

 [ERROR] maxVector = attribMap.reduceByKey( getMax )

 [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error:
 missing arguments for method getMin in class DatasetReader;

 [ERROR] follow this method with '_' if you want to treat it as a partially
 applied function

 [ERROR] minVector = attribMap.reduceByKey( getMin )

 I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as
 per my knowledge, I can pass any method to it as long as the functions is
 of
 the type f : (V, V) = V.

 I am really stuck here. Please help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Unit tests of spark application

2015-07-10 Thread Richard Marscher
Unless you had something specific in mind, it should be as simple as
creating a SparkContext object using a master of local[2] in your tests

On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu
wrote:

 Hi,

 I want to write junit test cases in scala for testing spark application.
 Is there any guide or link which I can refer.

 Thank you very much.

 -Naveen






-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Spark serialization in closure

2015-07-09 Thread Richard Marscher
Reading that article and applying it to your observations of what happens
at runtime:

shouldn't the closure require serializing testing? The foo singleton object
is a member of testing, and then you call this foo value in the closure
func and further in the foreachPartition closure. So following by that
article, Scala will attempt to serialize the containing object/class
testing to get the foo instance.

On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote:

 Repost the code example,

 object testing extends Serializable {
 object foo {
   val v = 42
 }
 val list = List(1,2,3)
 val rdd = sc.parallelize(list)
 def func = {
   val after = rdd.foreachPartition {
 it = println(foo.v)
   }
 }
   }

 On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Erik. I saw the document too. That is why I am confused because as
 per the article, it should be good as long as *foo *is serializable.
 However, what I have seen is that it would work if *testing* is
 serializable, even foo is not serializable, as shown below. I don't know if
 there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but I
 am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing
 the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




 --
 Chen Song




-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Create RDD from output of unix command

2015-07-08 Thread Richard Marscher
As a distributed data processing engine, Spark should be fine with millions
of lines. It's built with the idea of massive data sets in mind. Do you
have more details on how you anticipate the output of a unix command
interacting with a running Spark application? Do you expect Spark to be
continuously running and somehow observe unix command outputs? Or are you
thinking more along the lines of running a unix command with output and
then taking whatever format that is and running a spark job against it? If
it's the latter, it should be as simple as writing the command output to a
file and then loading the file into an RDD in Spark.

On Wed, Jul 8, 2015 at 2:02 PM, foobar heath...@fb.com wrote:

 What's the best practice of creating RDD from some external unix command
 output? I assume if the output size is large (say millions of lines),
 creating RDD from an array of all lines is not a good idea? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-08 Thread Richard Marscher
Ah, I see this is streaming. I haven't any practical experience with that
side of Spark. But the foreachPartition idea is a good approach. I've used
that pattern extensively, even though not for singletons, but just to
create non-serializable objects like API and DB clients on the executor
side. I think it's the most straightforward approach to dealing with any
non-serializable object you need.

I don't entirely follow what over-network data shuffling effects you are
alluding to (maybe more specific to streaming?).

On Wed, Jul 8, 2015 at 9:41 AM, Dmitry Goldenberg dgoldenberg...@gmail.com
wrote:

 My singletons do in fact stick around. They're one per worker, looks
 like.  So with 4 workers running on the box, we're creating one singleton
 per worker process/jvm, which seems OK.

 Still curious about foreachPartition vs. foreachRDD though...

 On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 Would it be possible to have a wrapper class that just represents a
 reference to a singleton holding the 3rd party object? It could proxy over
 calls to the singleton object which will instantiate a private instance of
 the 3rd party object lazily? I think something like this might work if the
 workers have the singleton object in their classpath.

 here's a rough sketch of what I was thinking:

 object ThirdPartySingleton {
   private lazy val thirdPartyObj = ...

   def someProxyFunction() = thirdPartyObj.()
 }

 class ThirdPartyReference extends Serializable {
   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
 }

 also found this SO post:
 http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers


 On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 Hi,

 I am seeing a lot of posts on singletons vs. broadcast variables, such as
 *

 http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
 *

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219

 What's the best approach to instantiate an object once and have it be
 reused
 by the worker(s).

 E.g. I have an object that loads some static state such as e.g. a
 dictionary/map, is a part of 3rd party API and is not serializable.  I
 can't
 seem to get it to be a singleton on the worker side as the JVM appears
 to be
 wiped on every request so I get a new instance.  So the singleton doesn't
 stick.

 Is there an approach where I could have this object or a wrapper of it
 be a
 broadcast var? Can Kryo get me there? would that basically mean writing a
 custom serializer?  However, the 3rd party object may have a bunch of
 member
 vars hanging off it, so serializing it properly may be non-trivial...

 Any pointers/hints greatly appreciated.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-07 Thread Richard Marscher
Would it be possible to have a wrapper class that just represents a
reference to a singleton holding the 3rd party object? It could proxy over
calls to the singleton object which will instantiate a private instance of
the 3rd party object lazily? I think something like this might work if the
workers have the singleton object in their classpath.

here's a rough sketch of what I was thinking:

object ThirdPartySingleton {
  private lazy val thirdPartyObj = ...

  def someProxyFunction() = thirdPartyObj.()
}

class ThirdPartyReference extends Serializable {
  def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
}

also found this SO post:
http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers


On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 Hi,

 I am seeing a lot of posts on singletons vs. broadcast variables, such as
 *

 http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
 *

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219

 What's the best approach to instantiate an object once and have it be
 reused
 by the worker(s).

 E.g. I have an object that loads some static state such as e.g. a
 dictionary/map, is a part of 3rd party API and is not serializable.  I
 can't
 seem to get it to be a singleton on the worker side as the JVM appears to
 be
 wiped on every request so I get a new instance.  So the singleton doesn't
 stick.

 Is there an approach where I could have this object or a wrapper of it be a
 broadcast var? Can Kryo get me there? would that basically mean writing a
 custom serializer?  However, the 3rd party object may have a bunch of
 member
 vars hanging off it, so serializing it properly may be non-trivial...

 Any pointers/hints greatly appreciated.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to create empty RDD

2015-07-06 Thread Richard Marscher
This should work

val output: RDD[(DetailInputRecord, VISummary)] =
sc.paralellize(Seq.empty[(DetailInputRecord, VISummary)])

On Mon, Jul 6, 2015 at 5:11 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I need to return an empty RDD of type

 val output: RDD[(DetailInputRecord, VISummary)]



 This does not work

 val output: RDD[(DetailInputRecord, VISummary)] = new RDD()

 as RDD is abstract class.

 How do i create empty RDD ?
 --
 Deepak




Re: Spark driver hangs on start of job

2015-07-02 Thread Richard Marscher
Ah I see, glad that simple patch works for your problem. That seems to be a
different underlying problem than we have been experiencing. In our case,
the executors are failing properly, its just that none of the new ones will
ever escape experiencing the same exact issue. So we start a death spiral
of thousands of failed executors, all of which can't connect with the
driver. Meanwhile, the driver just sits there in the zombie state doing
nothing while it waits for executors to respond. In that light, my solution
is geared towards solving things on the driver-side gracefully.


On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder sjoerdmul...@gmail.com
wrote:

 Hi Richard,

 I have actually applied the following fix to our 1.4.0 version and this
 seem to resolve the zombies :)

 https://github.com/apache/spark/pull/7077/files

 Sjoerd

 2015-06-26 20:08 GMT+02:00 Richard Marscher rmarsc...@localytics.com:

 Hi,

 we are on 1.3.1 right now so in case there are differences in the Spark
 files I'll walk through the logic of what we did and post a couple gists at
 the end. We haven't committed to forking Spark for our own deployments yet,
 so right now we shadow some Spark classes in our application code with our
 versions of the classes. Keep in mind I am not a Spark committer so the
 following is a best effort basis that is working for us. But it may be that
 someone more knowledgable about the Spark codebase might see a pitfall to
 my solution or a better solution.

 --

 First, we'll start with the root issue in TaskSchedulerImpl. You will
 find the code that prints the Initial job has not accepted any resources
 warning inside the submitTasks function. Spark creates a separate thread
 that checks some conditions every STARVATION_TIMEOUT milliseconds until
 the submitted task set has been launched. It only posts the warn logging
 here and does nothing. I will come back to this part of the code in a
 moment.

 The code that determines when the hasLaunchedTask flag gets set (and
 thus closes out the starvation thread and the task set is being worked on
 by the cluster) is within the resourceOffers function. The various Spark
 Scheduler Backend classes will periodically call this function in
 TaskSchedulerImpl until cluster resources have been assigned to the task
 set.

 To start signaling the zombied scenario, I created a new flag: @volatile
 private var hasZombied = false. In our experience we always get the
 resources in resourceOffer before the starvation thread runs, otherwise we
 have always hit the zombie scenario if resources weren't allocated yet. So
 I added a conditional before the if(tasks.size  0) { hasLaunchedTask =
 true } block. The conditional checks if(!hasLaunchedTask  hasZombied) {
 dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in
 a moment.

 The last detail here is to add code inside the starvation thread block
 after it posts the warning log. Set hasZombied to true and then call
 this.cancel() to stop the starvation thread from continuing to run. With
 this we now have all the steps needed inside TaskSchedulerImpl to start
 signaling out the zombied condition.

 Back to the custom function. DAGScheduler has reference to the
 appropriate Spark listeners that can propagate errors to the task set and
 more importantly back to your application code. If you look at DAGScheduler
 class, you will find a function called cleanUpAfterSchedulerStop(). This
 function does everything we want, except it is hard coded to a specific
 exception val error = new SparkException(...). What I did was copy this
 and made another function that returned a custom Exception I created that I
 use to signal the zombie, something like
 SparkTaskResourceAllocationZombieError. Now you call this function within
 the conditional block in TaskSchedulerImpl.resourceOffers and you should
 see your exception propagating out to your application code so you can take
 appropriate actions.

 In our case, we are submitting Spark applications programmatically from a
 Scala application service on an EC2 instance to a Spark Standalone cluster
 in EC2. Whenever we see this error, the application service EC2 instance is
 unable to get resources from the cluster even when attempting subsequent
 Spark applications for a long period of time (it eventually recovers hours
 or days later but that is not useful for us). So in our case we need to
 reschedule the failed Spark application on another EC2 application instance
 and shut down this current EC2 instance because it can no longer get
 cluster resources. Your use case may be different, but at least action can
 be taken at an application level.

 Here is some source code, you should be able to locate most of my
 additions to the code by searching for comments starting with //
 Localytics Code
 TaskSchedulerImpl gist:
 https://gist.github.com/rmarsch/e5d298e582ab75957957
 DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

 Regards,
 Richard

Re: Applying functions over certain count of tuples .

2015-06-29 Thread Richard Marscher
Hi,

not sure what the context is but I think you can do something similar with
mapPartitions:

rdd.mapPartitions { iterator =

  iterator.grouped(5).map { tupleGroup = emitOneRddForGroup(tupleGroup) }

}

The edge case is when the final grouping doesn't have exactly 5 items, if
that matters.

On Mon, Jun 29, 2015 at 3:57 PM, anshu shukla anshushuk...@gmail.com
wrote:

 I want  to apply some logic   on the basis  of  a FIX count of number of
 tuples  in each RDD . *suppose emit one rdd for every 5  tuple of
  previous  RDD . *



 --
 Thanks  Regards,
 Anshu Shukla



Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
We've seen this issue as well in production. We also aren't sure what
causes it, but have just recently shaded some of the Spark code in
TaskSchedulerImpl that we use to effectively bubble up an exception from
Spark instead of zombie in this situation. If you are interested I can go
into more detail about that. Otherwise I'm also keen to find out more on
how this might be happening.

On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder sjoerdmul...@gmail.com
wrote:

 Hi,

 I have a really annoying issue that i cannot replicate consistently, still
 it happens every +- 100 submissions. (it's a job that's running every 3
 minutes).
 Already reported an issue for this:
 https://issues.apache.org/jira/browse/SPARK-8592

 Here are the Thread dump of the Driver and the Executor:
 https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ

 Any direction is should look into?

 Spark 1.4.0
 Java 1.8.0_45 (Oracle Corporation)
 Scala 2.11.6

 I already tried to resolve the NPE by not logging the ActorRef. This makes
 the NPE go away :)

 But  the root cause lies deeper I expect, since then the driver then still
 hangs with the *WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient resources* messages. But there are enough resources
 available in the cluster, it has plenty of CPU and Memory left.

 Logs from Driver:

 15/06/26 11:58:19 INFO Remoting: Starting remoting
 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@172.17.0.123:51415]
 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver'
 on port 51415.
 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
 /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity
 265.1 MB
 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
 server' on port 33176.
 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on
 port 4040.
 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at
 http://172.17.0.123:4040
 15/06/26 11:58:20 INFO SparkContext: Added JAR
 file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
 http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp
 1435319900257
 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
 cluster with app ID app-20150626115820-0917
 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
 app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
 10.0.7.171:47050) with 1 cores
 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0
 GB RAM
 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution
 thread
 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
 app-20150626115820-0917/0 is now LOADING
 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
 app-20150626115820-0917/0 is now RUNNING
 15/06/26 11:58:20 INFO Utils: Successfully started service
 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000
 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager
 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
 manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
 172.17.0.123, 52000)
 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.0
 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
 15/06/26 11:58:24 INFO SparkContext: Starting job: map at
 SparkProductEventAggregator.scala:144
 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1
 [5cc3f53084]
 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
 [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
 {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
 [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at
 SparkProductEventAggregator.scala:144)
 

Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
Hi,

we are on 1.3.1 right now so in case there are differences in the Spark
files I'll walk through the logic of what we did and post a couple gists at
the end. We haven't committed to forking Spark for our own deployments yet,
so right now we shadow some Spark classes in our application code with our
versions of the classes. Keep in mind I am not a Spark committer so the
following is a best effort basis that is working for us. But it may be that
someone more knowledgable about the Spark codebase might see a pitfall to
my solution or a better solution.

--

First, we'll start with the root issue in TaskSchedulerImpl. You will find
the code that prints the Initial job has not accepted any resources
warning inside the submitTasks function. Spark creates a separate thread
that checks some conditions every STARVATION_TIMEOUT milliseconds until
the submitted task set has been launched. It only posts the warn logging
here and does nothing. I will come back to this part of the code in a
moment.

The code that determines when the hasLaunchedTask flag gets set (and thus
closes out the starvation thread and the task set is being worked on by the
cluster) is within the resourceOffers function. The various Spark
Scheduler Backend classes will periodically call this function in
TaskSchedulerImpl until cluster resources have been assigned to the task
set.

To start signaling the zombied scenario, I created a new flag: @volatile
private var hasZombied = false. In our experience we always get the
resources in resourceOffer before the starvation thread runs, otherwise we
have always hit the zombie scenario if resources weren't allocated yet. So
I added a conditional before the if(tasks.size  0) { hasLaunchedTask =
true } block. The conditional checks if(!hasLaunchedTask  hasZombied) {
dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in
a moment.

The last detail here is to add code inside the starvation thread block
after it posts the warning log. Set hasZombied to true and then call
this.cancel() to stop the starvation thread from continuing to run. With
this we now have all the steps needed inside TaskSchedulerImpl to start
signaling out the zombied condition.

Back to the custom function. DAGScheduler has reference to the appropriate
Spark listeners that can propagate errors to the task set and more
importantly back to your application code. If you look at DAGScheduler
class, you will find a function called cleanUpAfterSchedulerStop(). This
function does everything we want, except it is hard coded to a specific
exception val error = new SparkException(...). What I did was copy this
and made another function that returned a custom Exception I created that I
use to signal the zombie, something like
SparkTaskResourceAllocationZombieError. Now you call this function within
the conditional block in TaskSchedulerImpl.resourceOffers and you should
see your exception propagating out to your application code so you can take
appropriate actions.

In our case, we are submitting Spark applications programmatically from a
Scala application service on an EC2 instance to a Spark Standalone cluster
in EC2. Whenever we see this error, the application service EC2 instance is
unable to get resources from the cluster even when attempting subsequent
Spark applications for a long period of time (it eventually recovers hours
or days later but that is not useful for us). So in our case we need to
reschedule the failed Spark application on another EC2 application instance
and shut down this current EC2 instance because it can no longer get
cluster resources. Your use case may be different, but at least action can
be taken at an application level.

Here is some source code, you should be able to locate most of my additions
to the code by searching for comments starting with // Localytics Code
TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957
DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

Regards,
Richard

On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder sjoerdmul...@gmail.com
wrote:

 Hi Richard,

 I would  like to see how we can get a workaround to get out of the Zombie
 situation since were planning for production :)

 If you could share the workaround or point directions that would be great!

 Sjoerd

 2015-06-26 16:53 GMT+02:00 Richard Marscher rmarsc...@localytics.com:

 We've seen this issue as well in production. We also aren't sure what
 causes it, but have just recently shaded some of the Spark code in
 TaskSchedulerImpl that we use to effectively bubble up an exception from
 Spark instead of zombie in this situation. If you are interested I can go
 into more detail about that. Otherwise I'm also keen to find out more on
 how this might be happening.

 On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder sjoerdmul...@gmail.com
 wrote:

 Hi,

 I have a really annoying issue that i cannot replicate consistently,
 still it happens every +- 100 submissions. (it's a job

Re: Limitations using SparkContext

2015-06-23 Thread Richard Marscher
Hi,

can you detail the symptom further? Was it that only 12 requests were
services and the other 440 timed out? I don't think that Spark is well
suited for this kind of workload, or at least the way it is being
represented. How long does a single request take Spark to complete?

Even with fair scheduling, you will only be able to have a fixed amount of
tasks running on Spark at once. Usually this is bounded by the max cores
setting in configuration. Since you mention local as a comparison point I
get the impression you are running Spark Standalone for cluster. The
implication, if this is reflective of your current setup, is that you
aren't going to get much concurrency for separate spray requests. lets say
your max cores is 16 and your number of tasks/partitions per stage of your
spark DAG is 8. Then at any given time only 2 requests can be serviced. It
may also be the case that with fair scheduling that a single request gets
pre-empted after completing one stage of the DAG and has to wait to
continue instead of proceeding directly to the next stage.

This hypothesis would also support the observation that local is no better
than cluster, because you probably have even less concurrent spark tasks
available on the single local machine.


spark.cores.max(not set)When running on a standalone deploy cluster
https://spark.apache.org/docs/1.3.1/spark-standalone.html or a Mesos
cluster in coarse-grained sharing mode
https://spark.apache.org/docs/1.3.1/running-on-mesos.html#mesos-run-modes,
the maximum amount of CPU cores to request for the application from across
the cluster (not from each machine). If not set, the default will be
spark.deploy.defaultCores on Spark's standalone cluster manager, or
infinite (all available cores) on Mesos.

On Tue, Jun 23, 2015 at 12:44 PM, daunnc dau...@gmail.com wrote:

 So the situation is following: got a spray server, with a spark context
 available (fair scheduling in a cluster mode, via spark-submit). There are
 some http urls, which calling spark rdd, and collecting information from
 accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of
 limitation, on requests:

 wrk -t8 -c50 -d30s http://localhost:/…/;
 Running 30s test @ http://localhost:/…/
   8 threads and 50 connections
   Thread Stats   Avg  Stdev Max   +/- Stdev
 Latency 1.03s   523.30ms   1.70s50.00%
 Req/Sec 6.05  5.4920.00 71.58%
   452 requests in 30.04s, 234.39KB read
   Socket errors: connect 0, read 0, write 0, timeout 440

 So this happens on making some calls with spark rdd (not depends on called
 function), and in browser you can see ERR_EMPTY_RESPONSE

 Now the solution was to use cache, but want to know about this limitations,
 or mb some settings.
 This error happens in local mode and in cluster mode, so guess not depends
 on it.

 P.S. logs are clear (or simply don't know where to look, but stdout of a
 spar-submit in a client mode is clear).



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How Spark Execute chaining vs no chaining statements

2015-06-23 Thread Richard Marscher
There should be no difference assuming you don't use the intermediately
stored rdd values you are creating for anything else (rdd1, rdd2). In the
first example it still is creating these intermediate rdd objects you are
just using them implicitly and not storing the value.

It's also worth pointing out that Spark is able to pipeline operations
together into stages. That is, it should effectively translate something
like like map(f1).map(f2).map(f3) to map(f1 - f2 - f3) in pseudcode, if
you will. Here is a more detailed explanation from one of the committer's
on SO:
http://stackoverflow.com/questions/19340808/spark-single-pipelined-scala-command-better-than-separate-commands

On Tue, Jun 23, 2015 at 5:17 PM, Ashish Soni asoni.le...@gmail.com wrote:

 Hi All ,

 What is difference between below in terms of execution to the cluster with
 1 or more worker node

 rdd.map(...).map(...)...map(..)

 vs

 val rdd1 = rdd.map(...)
 val rdd2 = rdd1.map(...)
 val rdd3 = rdd2.map(...)

 Thanks,
 Ashish



Re: Multiple executors writing file using java filewriter

2015-06-22 Thread Richard Marscher
Is spoutLog just a non-spark file writer? If you run that in the map call
on a cluster its going to be writing in the filesystem of the executor its
being run on. I'm not sure if that's what you intended.

On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Running perfectly in local system but not writing to file in cluster mode 
 .ANY suggestions please ..


 //msgid is long counter

 JavaDStreamString  newinputStream=inputStream.map(new FunctionString, 
 String() {
 @Override
 public String call(String v1) throws Exception {
 String s1=msgId+@+v1;
 System.out.println(s1);
 msgId++;
 try {
 *//filewriter logic
 spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + 
 msgeditor.getMessageId(s1));*
 } catch (Exception e) {

 System.out.println(exeception is here);
 e.printStackTrace();
 throw e;
 }
 System.out.println(msgid,+msgId);
 return  msgeditor.addMessageId(v1,msgId);
 }
 });


 --
 Thanks  Regards,
 Anshu Shukla

 On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 Can not we  write some data to a txt file  in parallel with multiple
 executors  running  in parallel ??


 --
 Thanks  Regards,
 Anshu Shukla




 --
 Thanks  Regards,
 Anshu Shukla



Re: Executor memory allocations

2015-06-18 Thread Richard Marscher
It would be the 40%, although it's probably better to think of it as
shuffle vs. data cache and the remainder goes to tasks. As the comments for
the shuffle memory fraction configuration clarify that it will be taking
memory at the expense of the storage/data cache fraction:

spark.shuffle.memoryFraction0.2Fraction of Java heap to use for aggregation
and cogroups during shuffles, ifspark.shuffle.spill is true. At any given
time, the collective size of all in-memory maps used for shuffles is
bounded by this limit, beyond which the contents will begin to spill to
disk. If spills are often, consider increasing this value at the expense of
spark.storage.memoryFraction.

On Wed, Jun 17, 2015 at 6:02 PM, Corey Nolet cjno...@gmail.com wrote:

 So I've seen in the documentation that (after the overhead memory is
 subtracted), the memory allocations of each executor are as follows (assume
 default settings):

 60% for cache
 40% for tasks to process data


 Reading about how Spark implements shuffling, I've also seen it say 20%
 of executor memory is utilized for shuffles Does this 20% cut into the 40%
 for tasks to process data or the 60% for the data cache?



Re: append file on hdfs

2015-06-10 Thread Richard Marscher
Hi,

if you now want to write 1 file per partition, that's actually built into
Spark as

*saveAsTextFile*(*path*)Write the elements of the dataset as a text file
(or set of text files) in a given directory in the local filesystem, HDFS
or any other Hadoop-supported file system. Spark will call toString on each
element to convert it to a line of text in the file.

On Wed, Jun 10, 2015 at 4:44 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hi,

 i have an idea to solve my problem, i want write one file for each spark
 partion,
 but i not know to get the actuel partion suffix/ID in my call function?

 points.foreachPartition(
 new VoidFunctionIteratorTuple2Integer,
 GeoTimeDataTupel() {

 private static final long serialVersionUID =
 -7210897529331503565L;

 public void call(IteratorTuple2Integer,
 GeoTimeDataTupel entry)throws Exception {
 while(entry.hasNext()) {
 Tuple2Integer, GeoTimeDataTupel temp =
 entry.next();

 try {
 FileSystem fs = FileSystem.get(new
 URI(pro.getProperty(hdfs.namenode)),new Configuration());
 Path pt=new
 Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results);
 }
 catch(Exception e) {
 e.printStackTrace();
 }
 }
 }
 }
 );

 2015-06-09 15:34 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 hi community,

 i want append results to one file. if i work local my function build all
 right,
 if i run this on a yarn cluster, i lost same rows.

 here my function to write:

 points.foreach(
 new VoidFunctionTuple2Integer, GeoTimeDataTupel() {

 private static final long serialVersionUID =
 2459995649387229261L;

 public void call(Tuple2Integer, GeoTimeDataTupel
 entry)throws Exception {
 try {
 FileSystem fs = FileSystem.get(new
 URI(pro.getProperty(hdfs.namenode)),new Configuration());
 Path pt=new
 Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results);

 if(fs.exists(pt)) {
 FSDataInputStream in = fs.open(pt);
 Path pt_temp = new
 Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results_temp);
 backup(fs.getConf(), fs, in, pt_temp);
 in.close();

 FSDataOutputStream out = fs.create((pt),
 true);
 FSDataInputStream backup = fs.open(pt_temp);

 int offset = 0;
 int bufferSize = 4096;

 int result = 0;

 byte[] buffer = new byte[bufferSize];
 // pre read a part of content from input
 stream
 result = backup.read(offset, buffer, 0,
 bufferSize);
 // loop read input stream until it does not
 fill whole size of buffer
 while (result == bufferSize) {
 out.write(buffer);
 // read next segment from input stream by
 moving the offset pointer
 offset += bufferSize;
 result = backup.read(offset, buffer, 0,
 bufferSize);
 }

 if (result  0  result  bufferSize) {
 for (int i = 0; i  result; i++) {
 out.write(buffer[i]);
 }
 }
 out.writeBytes(Cluster: +entry._1+, Point:
 +entry._2.toString()+\n);
 out.close();
 }
 else {
 BufferedWriter bw =new BufferedWriter(new
 OutputStreamWriter(fs.create(pt)));
 bw.write(Cluster: +entry._1+, Point:
 +entry._2.toString()+\n);
 bw.close();
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 }

 public void backup(Configuration conf, FileSystem
 fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {

 FSDataOutputStream out = fs.create(pt_temp, true);
 IOUtils.copyBytes(sourceContent, out, 4096, false);
 out.close();
 }

 where is my fault?? or give it a function to write(append) to the hadoop
 hdfs?

 best regards,
 paul





Re: spark eventLog and history server

2015-06-09 Thread Richard Marscher
Hi,

I don't have a complete answer to your questions but:

Removing the suffix does not solve the problem - unfortunately this is
true, the master web UI only tries to build out a Spark UI from the event
logs once, at the time the context is closed. If the event logs are
in-progress at this time, then you basically missed the opportunity.

Does it mean I don't need to start history server if I only use spark in
standalone mode? - Yes, you don't need to start the history server.

On Mon, Jun 8, 2015 at 7:57 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 Event log is enabled in my spark streaming app. My code runs in standalone
 mode and the spark version is 1.3.1. I periodically stop and restart the
 streaming context by calling ssc.stop(). However, from the web UI, when
 clicking on a past job, it says the job is still in progress and does not
 show the event log. The event log files have suffix .inprogress. Removing
 the suffix does not solve the problem. Do I need to do anything here in
 order to view the event logs of finished jobs? Or do I need to stop ssc
 differently?

 In addition, the documentation seems to suggest history server is used for
 Mesos or YARN mode. Does it mean I don't need to start history server if I
 only use spark in standalone mode?

 Thanks,
 Du



FileOutputCommitter deadlock 1.3.1

2015-06-08 Thread Richard Marscher
Hi,

we've been seeing occasional issues in production with the FileOutCommitter
reaching a deadlock situation.

We are writing our data to S3 and currently have speculation enabled. What
we see is that Spark get's a file not found error trying to access a
temporary part file that it wrote (part-#2 file it seems to be every
time?), so the task fails. But the file actually exists in S3 so subsequent
speculations and task retries all fail because the committer tells them the
file exists. This will persist until human intervention kills the
application. Usually rerunning the application will succeed on the next try
so it is not deterministic with the dataset or anything.

It seems like there isn't a good story yet for file writing and speculation
(https://issues.apache.org/jira/browse/SPARK-4879), although our error here
seems worse that reports in that issue since I believe ours deadlocks and
those don't?

Has anyone else observed deadlocking like this?

Thanks,
Richard


Re: Deduping events using Spark

2015-06-04 Thread Richard Marscher
I think if you create a bidirectional mapping from AnalyticsEvent to
another type that would wrap it and use the nonce as its equality, you
could then do something like reduceByKey to group by nonce and map back to
AnalyticsEvent after.

On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote:

 I'm still a bit new to Spark and am struggilng to figure out the best way
 to
 Dedupe my events.

 I load my Avro files from HDFS and then I want to dedupe events that have
 the same nonce.

 For example my code so far:

  JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
 context.newAPIHadoopRDD(
 context.hadoopConfiguration(),
 AvroKeyInputFormat.class,
 AvroKey.class,
 NullWritable.class
 ).keys())
 .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
 .filter(key - { return
 Optional.ofNullable(key.getStepEventKey()).isPresent(); })

 Now I want to get back an RDD of AnalyticsEvents that are unique. So I
 basically want to do:
 if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
 them.

 I'm not sure how to do this? If I do reduceByKey it reduces by
 AnalyticsEvent not by the values inside?

 Any guidance would be much appreciated how I can walk this list of events
 and only return a filtered version of unique nocnes.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Scaling spark jobs returning large amount of data

2015-06-04 Thread Richard Marscher
It is possible to start multiple concurrent drivers, Spark dynamically
allocates ports per spark application on driver, master, and workers from
a port range. When you collect results back to the driver, they do not go
through the master. The master is mostly there as a coordinator between the
driver and the cluster of worker nodes, but otherwise the workers and
driver communicate directly for the underlying workload.

A spark application relates to one instance of a SparkContext
programmatically or to one call to one of the spark submit scripts.
Assuming you don't have dynamic resource allocation setup, each application
takes a fixed amount of the cluster resources to run. So as long as you
subdivide your cluster resources properly you can run multiple concurrent
applications against it. We are doing this in production presently.

Alternately, as Igor suggests, you can share a spark application and launch
different jobs within it. They will share the resources allocated to the
application in this case. An effect of this is you will only have a finite
amount of concurrent spark tasks (roughly translates to 1 task can execute
1 partition of a job at a time). If you launch multiple independent jobs
within the same application you will likely want to enable fair job
scheduling, otherwise stages between independent jobs will run in a FIFO
order instead of interleaving execution.

Hope this helps,
Richard

On Thu, Jun 4, 2015 at 11:20 AM, Igor Berman igor.ber...@gmail.com wrote:

 Hi,
 as far as I understand you shouldn't send data to driver. Suppose you have
 file in hdfs/s3 or cassandra partitioning, you should create your job such
 that every executor/worker of spark will handle part of your input,
 transform, filter it and at the end write back to cassandra as output(once
 again every executor/core inside worker will write part of the output, in
 your case they will write part of report)

 In general I find that submitting multiple jobs in same spark context(aka
 driver) is more performant(you don't pay startup-shutdown time), for this
 some use rest server for submitting jobs to long running spark
 context(driver)

 I'm not sure you can run multiple concurrent drivers because of ports

 On 4 June 2015 at 17:30, Giuseppe Sarno giuseppesa...@fico.com wrote:

  Hello,

 I am relatively new to spark and I am currently trying to understand how
 to scale large numbers of jobs with spark.

 I understand that spark architecture is split in “Driver”, “Master” and
 “Workers”. Master has a standby node in case of failure and workers can
 scale out.

 All the examples I have seen show Spark been able to distribute the load
 to the workers and returning small amount of data to the Driver. In my case
 I would like to explore the scenario where I need to generate a large
 report on data stored on Cassandra and understand how Spark architecture
 will handle this case when multiple report jobs will be running in parallel.

 According to this  presentation
 https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/
 responses from workers go through the Master and finally to the Driver.
 Does this mean that the Driver and/ or Master is a single point for all the
 responses coming back from workers ?

 Is it possible to start multiple concurrent Drivers ?



 Regards,

 Giuseppe.



 Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra)
 Limited (Co. No. 03295455) are registered in England and Wales and have a
 registered office address of Cottons Centre, 5th Floor, Hays Lane, London,
 SE1 2QP.

 This email and any files transmitted with it are confidential,
 proprietary and intended solely for the individual or entity to whom they
 are addressed. If you have received this email in error please delete it
 immediately.





Re: Application is always in process when I check out logs of completed application

2015-06-03 Thread Richard Marscher
I had the same issue a couple days ago. It's a bug in 1.3.0 that is fixed
in 1.3.1 and up.

https://issues.apache.org/jira/browse/SPARK-6036

The ordering that the event logs are moved from in-progress to complete is
coded to be after the Master tries to build the history page for the logs.
The only reason it even works on occasion in 1.3.0 is because the Master
part is run asynchronously and the event log status change is synchronous,
so the Master part on some occasions could be executed afterwards as a race
condition.

On Wed, Jun 3, 2015 at 2:17 AM, ayan guha guha.a...@gmail.com wrote:

 Have you done sc.stop() ? :)
 On 3 Jun 2015 14:05, amghost zhengweita...@outlook.com wrote:

 I run spark application in spark standalone cluster with client deploy
 mode.
 I want to check out the logs of my finished application, but I always
 get  a
 page telling me Application history not found - Application xxx is still
 in
 process.
 I am pretty sure that the application has indeed completed because I can
 see
 it in the Completed Applications list show by Spark WebUI, and I have also
 found the log file with suffix .inprocessin the directory set by
 spark.eventLog.dir in my spark-default.conf

 Oh, BTW, I am using spark 1.3.0

 So, is there anything I missed?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Client

2015-06-03 Thread Richard Marscher
I think the short answer to the question is, no, there is no alternate API
that will not use the System.exit calls. You can craft a workaround like is
being suggested in this thread. For comparison, we are doing programmatic
submission of applications in a long-running client application. To get
around these issues we make a shadowed version of some of the Spark code in
our application to remove the System.exit calls so instead exceptions
bubble up to our application.

On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try this?

 Create an sbt project like:

  // Create your context
  val sconf = new
 SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077)
  val sc = new SparkContext(sconf)

  // Do some computations
  sc.parallelize(1 to 1).take(10).foreach(println)

  //Now return the exit status
  System.exit(Some number)

  Now, make your workflow manager to trigger *sbt run* on the project
 instead of using spark-submit.



 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi akhil , sorry i may not conveying the question properly .  Actually we
 are looking to Launch a spark job from a long running workflow manager,
 which invokes spark client via SparkSubmit. Unfortunately the client upon
 successful completion of the application exits with a System.exit(0) or
 System.exit(NON_ZERO) when there is a failure. Question is, Is there an
 alternate  api though which a spark application can be launched which can
 return a exit status back to the caller as opposed to initiating JVM halt.

 On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Run it as a standalone application. Create an sbt project and do sbt run?

 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi guys , i am new to spark . I am using sparksubmit to submit spark
 jobs. But for my use case i don't want it to be exit with System.exit . Is
 there any other spark client which is api friendly other than SparkSubmit
 which shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri





 --
 Regards
 Pavan Kumar Kolamuri





Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Richard Marscher
Are you sure it's memory related? What is the disk utilization and IO
performance on the workers? The error you posted looks to be related to
shuffle trying to obtain block data from another worker node and failing to
do so in reasonable amount of time. It may still be memory related, but I'm
not sure that other resources are ruled out yet.

On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a set
 of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Event Logging to HDFS on Standalone Cluster In Progress

2015-06-01 Thread Richard Marscher
Hi,

In Spark 1.3.0 I've enabled event logging to write to an existing HDFS
folder on a Standalone cluster. This is generally working, all the logs are
being written. However, from the Master Web UI, the vast majority of
completed applications are labeled as not having a history:
http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914)

The log does exists though:

# hdfs dfs -ls -R /eventLogs/app-20150601160846-1914

-rw-rw   3 user group1027848 2015-06-01 16:09
/eventLogs/app-20150601160846-1914

and `cat` the file ends with:

{Event:SparkListenerApplicationEnd,Timestamp:1433174955077}

This seems to indicate it saw and logged the application end.

Is there a known issue here or a workaround? Looking at the source code I
might have expected these files to end in `.inprogress` given the UI error
message, but they don't.

Thanks,
Richard


Re: Event Logging to HDFS on Standalone Cluster In Progress

2015-06-01 Thread Richard Marscher
Ah, apologies, I found an existing issue and fix has already gone out for
this in 1.3.1 and up: https://issues.apache.org/jira/browse/SPARK-6036.

On Mon, Jun 1, 2015 at 3:39 PM, Richard Marscher rmarsc...@localytics.com
wrote:

 It looks like it is possibly a race condition between removing the
 IN_PROGRESS and building the history UI for the application.

 `AppClient` sends an `UnregisterApplication(appId)` message to the
 `Master` actor, which triggers the process to look for the app's eventLogs.
 If they are suffixed with `.inprogress` then it will not build out the
 history UI and instead build the error page I've seen.

 Tying this together, calling SparkContext.stop() has the following block:


 if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null }
 if (_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false
 } _eventLogger.foreach(_.stop())
 Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend
 which has an AppClient. AppClient sends itself a message to stop itself,
 and like mentioned above, it then sends a message to the Master where it
 tries to build the history UI.

 Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix
 is removed in the file-system. It seems like the race condition of the Akka
 message passing to trigger the Master's building of the history UI may be
 the only reason the history UI ever gets properly setup in the first place.
 Because if the ordering of calls were all strict in the SparkContext.stop
 method then you would expect the Master to always see the event logs as in
 in progress.

 Maybe I have missed something in tracing through the code? Is there a
 reason that the eventLogger cannot be closed before the dagScheduler?

 Thanks,
 Richard

 On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 Hi,

 In Spark 1.3.0 I've enabled event logging to write to an existing HDFS
 folder on a Standalone cluster. This is generally working, all the logs are
 being written. However, from the Master Web UI, the vast majority of
 completed applications are labeled as not having a history:
 http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914)

 The log does exists though:

 # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914

 -rw-rw   3 user group1027848 2015-06-01 16:09
 /eventLogs/app-20150601160846-1914

 and `cat` the file ends with:

 {Event:SparkListenerApplicationEnd,Timestamp:1433174955077}

 This seems to indicate it saw and logged the application end.

 Is there a known issue here or a workaround? Looking at the source code I
 might have expected these files to end in `.inprogress` given the UI error
 message, but they don't.

 Thanks,
 Richard





Re: Event Logging to HDFS on Standalone Cluster In Progress

2015-06-01 Thread Richard Marscher
It looks like it is possibly a race condition between removing the
IN_PROGRESS and building the history UI for the application.

`AppClient` sends an `UnregisterApplication(appId)` message to the `Master`
actor, which triggers the process to look for the app's eventLogs. If they
are suffixed with `.inprogress` then it will not build out the history UI
and instead build the error page I've seen.

Tying this together, calling SparkContext.stop() has the following block:


if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null } if
(_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false }
_eventLogger.foreach(_.stop())
Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend
which has an AppClient. AppClient sends itself a message to stop itself,
and like mentioned above, it then sends a message to the Master where it
tries to build the history UI.

Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix is
removed in the file-system. It seems like the race condition of the Akka
message passing to trigger the Master's building of the history UI may be
the only reason the history UI ever gets properly setup in the first place.
Because if the ordering of calls were all strict in the SparkContext.stop
method then you would expect the Master to always see the event logs as in
in progress.

Maybe I have missed something in tracing through the code? Is there a
reason that the eventLogger cannot be closed before the dagScheduler?

Thanks,
Richard

On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher rmarsc...@localytics.com
wrote:

 Hi,

 In Spark 1.3.0 I've enabled event logging to write to an existing HDFS
 folder on a Standalone cluster. This is generally working, all the logs are
 being written. However, from the Master Web UI, the vast majority of
 completed applications are labeled as not having a history:
 http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914)

 The log does exists though:

 # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914

 -rw-rw   3 user group1027848 2015-06-01 16:09
 /eventLogs/app-20150601160846-1914

 and `cat` the file ends with:

 {Event:SparkListenerApplicationEnd,Timestamp:1433174955077}

 This seems to indicate it saw and logged the application end.

 Is there a known issue here or a workaround? Looking at the source code I
 might have expected these files to end in `.inprogress` given the UI error
 message, but they don't.

 Thanks,
 Richard



Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-15 Thread Richard Marscher
The doc is a bit confusing IMO, but at least for my application I had to
use a fair pool configuration to get my stages to be scheduled with FAIR.

On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 No pools for the moment – for each of the apps using the straightforward
 way with the spark conf param for scheduling = FAIR



 Spark is running in a Standalone Mode



 Are you saying that Configuring Pools is mandatory to get the FAIR
 scheduling working – from the docs it seemed optional to me



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Friday, May 15, 2015 6:45 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond



 How are you configuring the fair scheduler pools?



 On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 I have run / submitted a few Spark Streaming apps configured with Fair
 scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode.
 Is FAIR scheduling supported at all for Spark Streaming apps and from what
 release / version - e.g. 1.3.1




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-15 Thread Richard Marscher
It's not a Spark Streaming app, so sorry I'm not sure of the answer to
that. I would assume it should work.

On Fri, May 15, 2015 at 2:22 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Ok thanks a lot for clarifying that – btw was your application a Spark
 Streaming App – I am also looking for confirmation that FAIR scheduling is
 supported for Spark Streaming Apps



 *From:* Richard Marscher [mailto:rmarsc...@localytics.com]
 *Sent:* Friday, May 15, 2015 7:20 PM
 *To:* Evo Eftimov
 *Cc:* Tathagata Das; user
 *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond



 The doc is a bit confusing IMO, but at least for my application I had to
 use a fair pool configuration to get my stages to be scheduled with FAIR.



 On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 No pools for the moment – for each of the apps using the straightforward
 way with the spark conf param for scheduling = FAIR



 Spark is running in a Standalone Mode



 Are you saying that Configuring Pools is mandatory to get the FAIR
 scheduling working – from the docs it seemed optional to me



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Friday, May 15, 2015 6:45 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond



 How are you configuring the fair scheduler pools?



 On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 I have run / submitted a few Spark Streaming apps configured with Fair
 scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode.
 Is FAIR scheduling supported at all for Spark Streaming apps and from what
 release / version - e.g. 1.3.1




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Looking inside the 'mapPartitions' transformation, some confused observations

2015-05-11 Thread Richard Marscher
I believe the issue in b and c is that you call iter.size which actually is
going to flush the iterator so the subsequent attempt to put it into a
vector will yield 0 items. You could use an ArrayBuilder for example and
not need to rely on knowing the size of the iterator.

On Mon, May 11, 2015 at 2:26 PM, myasuka myas...@live.com wrote:

 As we all know, a partition in Spark is actually an Iterator[T]. For some
 purpose, I want to treat each partition not an Iterator but one whole
 object. For example, treat Iterator[Int] to a
 breeze.linalg.DenseVector[Int]. Thus I use 'mapPartitions' API to achieve
 this, however, during the implementation, I observed some confused
 observations.
 I use Spark 1.3.0 on 10 executor nodes cluster, below is different
 attempts:

 /import breeze.linalg.DenseVector/

 val a = sc.parallelize( 1 to 100, 10)

 val b = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size)
 var ind = 0
 while(iter.hasNext){
  v(ind) = iter.next
  ind += 1
 }
 println(v.mkString(,))
 Iterator.single[DenseVector[Int]](DenseVector(v))}
 )
 b.count()

 val c = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size)
 iter.copyToArray(v, 0, 10)
 println(v.mkString(,))
 Iterator.single[DenseVector[Int]](DenseVector(v))}
 )
 c.count()

 val d = a.mapPartitions(iter ={val v = iter.toArray
 println(v.mkString(,))
 Iterator.single[DenseVector[Int]](DenseVector(v))}
 )
 d.count()

 I can see the printed output in the executor's stdout, actually only
 attempt
 'd' satisfy my needs, and other attempts only get a zero desevector, which
 means the variable assignment from iterator to vector did not happen.

 Hope for explanations for these observations.
 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Looking-inside-the-mapPartitions-transformation-some-confused-observations-tp22850.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Richard Marscher
By default you would expect to find the logs files for master and workers
in the relative `logs` directory from the root of the Spark installation on
each of the respective nodes in the cluster.

On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Ø  Can you check your local and remote logs?



 Where are the log files? I see the following in my Driver program logs as
 well as the Spark UI failed task page



 java.io.IOException: org.apache.spark.SparkException: Failed to get
 broadcast_2_piece0 of broadcast_2



 Here is the detailed stack trace.

 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
 LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
 org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0 of broadcast_2

 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)

 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)

 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 at org.apache.spark.scheduler.Task.run(Task.scala:64)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)







 Ningjun



 *From:* Jonathan Coveney [mailto:jcove...@gmail.com]
 *Sent:* Wednesday, May 06, 2015 5:23 PM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* Ted Yu; user@spark.apache.org

 *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0



 Can you check your local and remote logs?



 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com:

 This problem happen in Spark 1.3.1.  It happen when two jobs are running
 simultaneously each in its own Spark Context.



 I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
 introduced in Spark 1.3.1?



 Ningjun



 *From:* Ted Yu [mailto:yuzhih...@gmail.com]
 *Sent:* Wednesday, May 06, 2015 11:32 AM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* user@spark.apache.org
 *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0



 Which release of Spark are you using ?



 Thanks


 On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

  I run a job on spark standalone cluster and got the exception below



 Here is the line of code that cause problem



 *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
 cattegory, path) *


 myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)

 *val *cats: Array[String] = myRdd.map(t = t._2).distinct().collect()  //
 This line cause the exception





 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
 LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
 org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0 of broadcast_2

 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)

 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)

 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 at org.apache.spark.scheduler.Task.run(Task.scala:64)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 Caused by: org.apache.spark.SparkException: Failed to get
 broadcast_2_piece0 of broadcast_2

 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr

 oadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)

 at
 

Re: Spark Job triggers second attempt

2015-05-07 Thread Richard Marscher
Hi,

I think you may want to use this setting?:

spark.task.maxFailures4Number of individual task failures before giving up
on the job. Should be greater than or equal to 1. Number of allowed retries
= this value - 1.

On Thu, May 7, 2015 at 2:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 How i can stop Spark to stop triggering second attempt in case the first
 fails.
 I do not want to wait for the second attempt to fail again so that i can
 debug faster.

 .set(spark.yarn.maxAppAttempts, 0) OR .set(spark.yarn.maxAppAttempts,
 1)

 is not helping.

 --
 Deepak




Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Richard Marscher
I should also add I've recently seen this issue as well when using collect.
I believe in my case it was related to heap space on the driver program not
being able to handle the returned collection.

On Thu, May 7, 2015 at 11:05 AM, Richard Marscher rmarsc...@localytics.com
wrote:

 By default you would expect to find the logs files for master and workers
 in the relative `logs` directory from the root of the Spark installation on
 each of the respective nodes in the cluster.

 On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

  Ø  Can you check your local and remote logs?



 Where are the log files? I see the following in my Driver program logs as
 well as the Spark UI failed task page



 java.io.IOException: org.apache.spark.SparkException: Failed to get
 broadcast_2_piece0 of broadcast_2



 Here is the detailed stack trace.

 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
 LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
 org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0 of broadcast_2

 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)

 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)

 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 at org.apache.spark.scheduler.Task.run(Task.scala:64)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)







 Ningjun



 *From:* Jonathan Coveney [mailto:jcove...@gmail.com]
 *Sent:* Wednesday, May 06, 2015 5:23 PM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* Ted Yu; user@spark.apache.org

 *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0



 Can you check your local and remote logs?



 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com:

 This problem happen in Spark 1.3.1.  It happen when two jobs are running
 simultaneously each in its own Spark Context.



 I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
 introduced in Spark 1.3.1?



 Ningjun



 *From:* Ted Yu [mailto:yuzhih...@gmail.com]
 *Sent:* Wednesday, May 06, 2015 11:32 AM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* user@spark.apache.org
 *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0



 Which release of Spark are you using ?



 Thanks


 On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

  I run a job on spark standalone cluster and got the exception below



 Here is the line of code that cause problem



 *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
 cattegory, path) *


 myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)

 *val *cats: Array[String] = myRdd.map(t = t._2).distinct().collect()
 // This line cause the exception





 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
 LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
 org.apache.spark.SparkException:
 Failed to get broadcast_2_piece0 of broadcast_2

 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)

 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)

 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)

 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 at org.apache.spark.scheduler.Task.run(Task.scala:64)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 Caused

Re: Maximum Core Utilization

2015-05-05 Thread Richard Marscher
Hi,

do you have information on how many partitions/tasks the stage/job is
running? By default there is 1 core per task, and your number of concurrent
tasks may be limiting core utilization.

There are a few settings you could play with, assuming your issue is
related to the above:
spark.default.parallelism
spark.cores.max
spark.task.cpus

On Tue, May 5, 2015 at 3:55 PM, Manu Kaul manohar.k...@gmail.com wrote:

 Hi All,
 For a job I am running on Spark with a dataset of say 350,000 lines (not
 big), I am finding that even though my cluster has a large number of cores
 available (like 100 cores), the Spark system seems to stop after using just
 4 cores and after that the runtime is pretty much a straight line no matter
 how many more cores are thrown at it. I am wondering if Spark tries to
 figure out the maximum no. of cores to use based on the size of the
 dataset? If yes, is there a way to disable this feature and force it to use
 all the cores available?

 Thanks,
 Manu

 --

 The greater danger for most of us lies not in setting our aim too high and
 falling short; but in setting our aim too low, and achieving our mark.
 - Michelangelo



Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-04 Thread Richard Marscher
In regards to the large GC pauses, assuming you allocated all 100GB of
memory per worker you may consider running with less memory on your Worker
nodes, or splitting up the available memory on the Worker nodes amongst
several worker instances. The JVM's garbage collection starts to become
very slow as the memory allocation for the heap becomes large. At 100GB it
may not be unusual to see GC take minutes at time. I believe with Yarn or
Standalone clusters you should be able to run multiple smaller JVM
instances on your workers so you can still use your cluster resources but
also won't have a single JVM allocating an unwieldy amount of much memory.

On Mon, May 4, 2015 at 2:23 AM, Nick Travers n.e.trav...@gmail.com wrote:

 Could you be more specific in how this is done?

 A DataFrame class doesn't have that method.

 On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote:

 You can use custom partitioner to redistribution using partitionby
 On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote:

 I'm currently trying to join two large tables (order 1B rows each) using
 Spark SQL (1.3.0) and am running into long GC pauses which bring the job
 to
 a halt.

 I'm reading in both tables using a HiveContext with the underlying files
 stored as Parquet Files. I'm using  something along the lines of
 HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 =
 b.col1) to
 set up the join.

 When I execute this (with an action such as .count) I see the first few
 stages complete, but the job eventually stalls. The GC counts keep
 increasing for each executor.

 Running with 6 workers, each with 2T disk and 100GB RAM.

 Has anyone else run into this issue? I'm thinking I might be running into
 issues with the shuffling of the data, but I'm unsure of how to get
 around
 this? Is there a way to redistribute the rows based on the join key
 first,
 and then do the join?

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Richard Marscher
I'm not sure, but I wonder if because you are using the Spark REPL that it
may not be representing what a normal runtime execution would look like and
is possibly eagerly running a partial DAG once you define an operation that
would cause a shuffle.

What happens if you setup your same set of commands [a-e] in a file and use
the Spark REPL's `load` or `paste` command to load them all at once?

On Wed, Apr 29, 2015 at 2:55 PM, Tom Hubregtsen thubregt...@gmail.com
wrote:

 Thanks for the responses.

 Try removing toDebugString and see what happens. 

 The toDebugString is performed after [d] (the action), as [e]. By then all
 stages are already executed.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Scalability of group by

2015-04-28 Thread Richard Marscher
Hi,

I can offer a few ideas to investigate in regards to your issue here. I've
run into resource issues doing shuffle operations with a much smaller
dataset than 2B. The data is going to be saved to disk by the BlockManager
as part of the shuffle and then redistributed across the cluster as
relevant to the group by. So the data is going to be replicated during the
operation.

I might suggest trying to allocate more memory for your executors in your
cluster. You might also want to look into configuring more explicitly the
shuffle functionality (
https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior).
Check the disk usage on the worker nodes, in our case we actually had small
disk space to start with and were running out of temporary space for the
shuffle operation.

I believe you should also be able to find more clear errors in logs from
the worker nodes if you haven't checked yet.

On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander alexander.ula...@hp.com
 wrote:

  It works on a smaller dataset of 100 rows. Probably I could find the
 size when it fails using binary search. However, it would not help me
 because I need to work with 2B rows.



 *From:* ayan guha [mailto:guha.a...@gmail.com]
 *Sent:* Monday, April 27, 2015 6:58 PM
 *To:* Ulanov, Alexander
 *Cc:* user@spark.apache.org
 *Subject:* Re: Scalability of group by



 Hi

 Can you test on a smaller dataset to identify if it is cluster issue or
 scaling issue in spark

 On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.com wrote:

  Hi,



 I am running a group by on a dataset of 2B of RDD[Row [id, time, value]]
 in Spark 1.3 as follows:

 “select id, time, first(value) from data group by id, time”



 My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor
 is allocated with 5GB of memory. However, all executors are being lost
 during the query execution and I get “ExecutorLostFailure”.



 Could you suggest what might be the reason for it? Could it be that “group
 by” is implemented as RDD.groupBy so it holds the group by result in
 memory? What is the workaround?



 Best regards, Alexander




Re: Group by order by

2015-04-27 Thread Richard Marscher
It's not related to Spark, but the concept of what you are trying to do
with the data. Grouping by ID means consolidating data for each ID down to
1 row per ID. You can sort by time after this point yes, but you would need
to either take each ID and time value pair OR do some aggregate operation
on the time. That's what the error message is explaining. Maybe you can
describe what you want your results to look like?

Here is some detail about the underlying operations here:

Example Data:
ID |  Time |  SomeVal

102-02-154
1   02-03-15 5
2   02-02-15 4
2   02-02-15 5
2   02-05-15 2

A.

So if you do Group By ID this means 1 row per ID like below:

ID

1
2

To include Time in this projection you need to aggregate it with a function
to a single value. Then and only then can you use it in the projection and
sort on it.

SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc;

ID  | max(time)
2 02-05-15
1 02-03-15

B.

Or if you do Group by ID, time then you get 1 row per ID and time pair:

ID | Time
102-02-15
102-03-15
202-02-15
202-05-15

Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the
results here. In this case you can sort the results by time without using
an aggregate function.

SELECT id, time FROM sample GROUP BY id, time SORT BY time desc;

ID | Time
202-05-15
102-03-15
102-02-15
202-02-15


On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  Hi Richard,



 There are several values of time per id. Is there a way to perform group
 by id and sort by time in Spark?



 Best regards, Alexander



 *From:* Richard Marscher [mailto:rmarsc...@localytics.com]
 *Sent:* Monday, April 27, 2015 12:20 PM
 *To:* Ulanov, Alexander
 *Cc:* user@spark.apache.org
 *Subject:* Re: Group by order by



 Hi,



 that error seems to indicate the basic query is not properly expressed. If
 you group by just ID, then that means it would need to aggregate all the
 time values into one value per ID, so you can't sort by it. Thus it tries
 to suggest an aggregate function for time so you can have 1 value per ID
 and properly sort it.



 On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander 
 alexander.ula...@hp.com wrote:

  Hi,



 Could you suggest what is the best way to do “group by x order by y” in
 Spark?



 When I try to perform it with Spark SQL I get the following error (Spark
 1.3):



 val results = sqlContext.sql(select * from sample group by id order by
 time)

 org.apache.spark.sql.AnalysisException: expression 'time' is neither
 present in the group by, nor is it an aggregate function. Add to group by
 or wrap in first() if you don't care which value you get.;

 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)



 Is there a way to do it with just RDD?



 Best regards, Alexander





Re: Group by order by

2015-04-27 Thread Richard Marscher
Hi,

that error seems to indicate the basic query is not properly expressed. If
you group by just ID, then that means it would need to aggregate all the
time values into one value per ID, so you can't sort by it. Thus it tries
to suggest an aggregate function for time so you can have 1 value per ID
and properly sort it.

On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  Hi,



 Could you suggest what is the best way to do “group by x order by y” in
 Spark?



 When I try to perform it with Spark SQL I get the following error (Spark
 1.3):



 val results = sqlContext.sql(select * from sample group by id order by
 time)

 org.apache.spark.sql.AnalysisException: expression 'time' is neither
 present in the group by, nor is it an aggregate function. Add to group by
 or wrap in first() if you don't care which value you get.;

 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)



 Is there a way to do it with just RDD?



 Best regards, Alexander



Re: Instantiating/starting Spark jobs programmatically

2015-04-21 Thread Richard Marscher
Could you possibly describe what you are trying to learn how to do in more
detail? Some basics of submitting programmatically:

- Create a SparkContext instance and use that to build your RDDs
- You can only have 1 SparkContext per JVM you are running, so if you need
to satisfy concurrent job requests you would need to manage a SparkContext
as a shared resource on that server. Keep in mind if something goes wrong
with that SparkContext, all running jobs would probably be in a failed
state and you'd need to try to get a new SparkContext.
- There are System.exit calls built into Spark as of now that could kill
your running JVM. We have shadowed some of the most offensive bits within
our own application to work around this. You'd likely want to do that or to
do your own Spark fork. For example, if the SparkContext can't connect to
your cluster master node when it is created, it will System.exit.
- You'll need to provide all of the relevant classes that your platform
uses in the jobs on the classpath of the spark cluster. We do this with a
JAR file loaded from S3 dynamically by a SparkContext, but there are other
options.

On Mon, Apr 20, 2015 at 10:12 PM, firemonk9 dhiraj.peech...@gmail.com
wrote:

 I have built a data analytics SaaS platform by creating Rest end points and
 based on the type of job request I would invoke the necessary spark
 job/jobs
 and return the results as json(async). I used yarn-client mode to submit
 the
 jobs to yarn cluster.

 hope this helps.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Problem with Spark SQL UserDefinedType and sbt assembly

2015-04-16 Thread Richard Marscher
If it fails with sbt-assembly but not without it, then there's always the
likelihood of a classpath issue. What dependencies are you rolling up into
your assembly jar?

On Thu, Apr 16, 2015 at 4:46 PM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Any ideas ?

 On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Dear all,

 Here is an issue that gets me mad. I wrote a UserDefineType in order to
 be able to store a custom type in a parquet file. In my code I just create
 a DataFrame with my custom data type and write in into a parquet file. When
 I run my code directly inside idea every thing works like a charm. But when
 I create the assembly jar with sbt assembly and run the same code with
 spark-submit I get the following error :

 *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0
 (TID 0)*
 *java.lang.IllegalArgumentException: Unsupported dataType:
 {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]},
 [1.1] failure: `TimestampType' expected but `{' found*


 *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}*
 *^*
 *at
 org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)*
 *at
 org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)*
 *at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)*
 *at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)*
 *at scala.util.Try.getOrElse(Try.scala:77)*
 *at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)*
 *at
 org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)*
 *at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)*
 *at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)*
 *at org.apache.spark.sql.parquet.ParquetRelation2.org
 http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)*
 *at
 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)*
 *at
 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)*
 *at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)*
 *at org.apache.spark.scheduler.Task.run(Task.scala:64)*
 *at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)*
 *at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)*
 *at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)*
 *at java.lang.Thread.run(Thread.java:745)*





Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Richard Marscher
Hi,

I've gotten an application working with sbt-assembly and spark, thought I'd
present an option. In my experience, trying to bundle any of the Spark
libraries in your uber jar is going to be a major pain. There will be a lot
of deduplication to work through and even if you resolve them it can be
easy to do it incorrectly. I considered it an intractable problem. So the
alternative is to not include those jars in your uber jar. For this to work
you will need the same libraries on the classpath of your Spark cluster and
your driver program (if you are running that as an application and not just
using spark-submit).

As for your NoClassDefFoundError, you either are missing Joda Time in your
runtime classpath or have conflicting versions. It looks like something
related to AWS wants to use it. Check your uber jar to see if its including
the org/joda/time as well as the classpath of your spark cluster. For
example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory
has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark
1.2 I found a conflict between httpclient versions that my uber jar pulled
in for AWS libraries and the one bundled in the spark uber jar. I hand
patched the spark uber jar to remove the offending httpclient bytecode to
resolve the issue. You may be facing a similar situation.

I hope that gives some ideas for resolving your issue.

Regards,
Rich

On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi Vadim,

 After removing provided from org.apache.spark %%
 spark-streaming-kinesis-asl I ended up with huge number of deduplicate
 errors:

 https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a

 It would be nice if you could share some pieces of your mergeStrategy code
 for reference.

 Also, after adding provided back to spark-streaming-kinesis-asl and I
 submit the spark job with the spark-streaming-kinesis-asl jar file

 sh /usr/lib/spark/bin/spark-submit --verbose --jars
 lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
 target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

 I still end up with the following error...

 Exception in thread main java.lang.NoClassDefFoundError:
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)

 Has anyone else run into this issue?



 On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I don't believe the Kinesis asl should be provided. I used mergeStrategy
 successfully to produce an uber jar.

 Fyi, I've been having trouble consuming data out of Kinesis with Spark
 with no success :(
 Would be curious to know if you got it working.

 Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi All,

 I have having trouble building a fat jar file through sbt-assembly.

 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
 strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
 with strategy 'discard'
 [warn] Merging