change default storage level

2015-07-09 Thread Michal Čizmazia
Is there a way how to change the default storage level?

If not, how can I properly change the storage level wherever necessary, if
my input and intermediate results do not fit into memory?

In this example:

context.wholeTextFiles(...)
.flatMap(s - ...)
.flatMap(s - ...)

Does persist() need to be called after every transformation?

 context.wholeTextFiles(...)
.persist(StorageLevel.MEMORY_AND_DISK)
.flatMap(s - ...)
.persist(StorageLevel.MEMORY_AND_DISK)
.flatMap(s - ...)
.persist(StorageLevel.MEMORY_AND_DISK)

 Thanks!


Re: Spark Mesos task rescheduling

2015-07-09 Thread Iulian Dragoș
On Thu, Jul 9, 2015 at 12:32 PM, besil sbernardine...@beintoo.com wrote:

 Hi,

 We are experimenting scheduling errors due to mesos slave failing.
 It seems to be an open bug, more information can be found here.

 https://issues.apache.org/jira/browse/SPARK-3289

 According to this  link
 
 https://mail-archives.apache.org/mod_mbox/mesos-user/201310.mbox/%3ccaakwvaxprrnrcdlazcybnmk1_9elyheodaf8urf8ssrlbac...@mail.gmail.com%3E
 
 from mail archive, it seems that Spark doesn't reschedule LOST tasks to
 active executors, but keep trying rescheduling it on the failed host.


Are you running in fine-grained mode? In coarse-grained mode it seems that
Spark will notice a slave that fails repeatedly and would not accept offers
on that slave:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala#L188



 We would like to dynamically resize our Mesos cluster (adding or removing
 machines - using an AWS autoscaling group), but this bug kills our running
 applications if a Mesos slave running a Spark executor is shut down.


I think what you need is dynamic allocation, which should be available soon
(PR: 4984 https://github.com/apache/spark/pull/4984).


 Is any known workaround?

 Thank you



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-task-rescheduling-tp23740.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


GraphX Synth Benchmark

2015-07-09 Thread AshutoshRaghuvanshi
I am running spark cluster over ssh in standalone mode,

I have run pagerank LiveJounral example:

MASTER=spark://172.17.27.12:7077 bin/run-example graphx.SynthBenchmark
-app=pagerank -niters=100 -nverts=4847571  Output/soc-liveJounral.txt

its been running for more than 2hours, I guess this is not normal, what am i
doing wrong?

system details:
4 nodes (1+3)
40 cores each, 64G memory out of which I have given spark.executer 50G

one more this I notice one of the server is used more than others.

Please help ASAP.

Thank you
http://apache-spark-user-list.1001560.n3.nabble.com/file/n23747/13.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Synth-Benchmark-tp23747.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Accessing Spark Web UI from another place than where the job actually ran

2015-07-09 Thread rroxanaioana


I have a spark cluster with 1 master 9nodes.I am running in standalone-mode.
I do not have access to a web browser from any of the nodes in the cluster
(I am connecting to the nodes through ssh --it is a grid5000 cluster). I was
wondering, is there any possibility to access Spark Web UI in this case? I
tried by copying the logs from my cluster in SPARK_PATH/work on my local
machine (leaving the impression that the jobs that ran in the cluster were
ran on my local machine). This idea came after reading this part from the
documentation:

If an application has logged events over the course of its lifetime, then
the Standalone master’s web UI will automatically re-render the
application’s UI after the application has finished.

But it did not work. What I can see in the UI is:
Applications: 0 Running, 0 
Completed Drivers: 0 Running, 0 Completed 
Status: ALIVE

Thank you!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-Spark-Web-UI-from-another-place-than-where-the-job-actually-ran-tp23745.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread Yana Kadiyska
Hi folks, I just re-wrote a query from using UNION ALL to use with rollup
and I'm seeing some unexpected behavior. I'll open a JIRA if needed but
wanted to check if this is user error. Here is my code:

case class KeyValue(key: Int, value: String)
val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF

df.registerTempTable(foo)

sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from
foo group by value with rollup”).show(100)


sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID
from foo group by key%100 with rollup”).show(100)

​

Grouping by value does the right thing, I get one group 0 with the overall
count. But grouping by expression (key%100) produces weird results --
appears that group 1 results are replicated as group 0. Am I doing
something wrong or is this a bug?


Re: Some BlockManager Doubts

2015-07-09 Thread Shixiong Zhu
MEMORY_AND_DISK will use disk if there is no enough memory. If there is no
enough memory when putting a MEMORY_AND_DISK block, BlockManager will store
it to disk. And if a MEMORY_AND_DISK block is dropped from memory,
MemoryStore will call BlockManager.dropFromMemory to store it to disk, see
MemoryStore.ensureFreeSpace for details.

Best Regards,
Shixiong Zhu

2015-07-09 19:17 GMT+08:00 Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com:

 Hi ,

 Just would like to clarify few doubts I have how BlockManager behaves .
 This is mostly in regards to Spark Streaming Context .

 There are two possible cases Blocks may get dropped / not stored in memory

 Case 1. While writing the Block for MEMORY_ONLY settings , if Node's
 BlockManager does not have enough memory to unroll the block , Block wont
 be stored to memory and Receiver will throw error while writing the Block..
 If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will
 be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This
 is fine in the case while receiving the blocks , but this logic has a issue
 when old Blocks are chosen to be dropped from memory as Case 2

 Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings ,
 blocks are successfully stored to Memory in Case 1 . Now what would happen
 if memory limit goes beyond a certain threshold, BlockManager start
 dropping LRU blocks from memory which was successfully stored while
 receiving.

 Primary issue here what I see , while dropping the blocks in Case 2 ,
 Spark does not check if storage level is using Disk (MEMORY_AND_DISK ) ,
 and even with DISK storage levels  blocks is drooped from memory without
 writing it to Disk.
 Or I believe the issue is at the first place that blocks are NOT written
 to Disk simultaneously in Case 1 , I understand this will impact throughput
 , but it design may throw BlockNotFound error if Blocks are chosen to be
 dropped even in case of StorageLevel is using Disk.

 Any thoughts ?

 Regards,
 Dibyendu



Re: Spark Mesos task rescheduling

2015-07-09 Thread Silvio Bernardinello
Hi,

Thank you for confirming my doubts and for the link.
Yes, we actually run in fine-grained mode because we would like to
dynamically resize our cluster as needed (thank you for the dynamic
allocation link).

However, we tried coarse grained mode and mesos seems not to relaunch the
failed task.
Maybe there is a timeout before trying to relaunch it, but I'm not aware of
it.



On Thu, Jul 9, 2015 at 5:13 PM, Iulian Dragoș iulian.dra...@typesafe.com
wrote:



 On Thu, Jul 9, 2015 at 12:32 PM, besil sbernardine...@beintoo.com wrote:

 Hi,

 We are experimenting scheduling errors due to mesos slave failing.
 It seems to be an open bug, more information can be found here.

 https://issues.apache.org/jira/browse/SPARK-3289

 According to this  link
 
 https://mail-archives.apache.org/mod_mbox/mesos-user/201310.mbox/%3ccaakwvaxprrnrcdlazcybnmk1_9elyheodaf8urf8ssrlbac...@mail.gmail.com%3E
 
 from mail archive, it seems that Spark doesn't reschedule LOST tasks to
 active executors, but keep trying rescheduling it on the failed host.


 Are you running in fine-grained mode? In coarse-grained mode it seems that
 Spark will notice a slave that fails repeatedly and would not accept offers
 on that slave:


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala#L188



 We would like to dynamically resize our Mesos cluster (adding or removing
 machines - using an AWS autoscaling group), but this bug kills our running
 applications if a Mesos slave running a Spark executor is shut down.


 I think what you need is dynamic allocation, which should be available
 soon (PR: 4984 https://github.com/apache/spark/pull/4984).


 Is any known workaround?

 Thank you



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-task-rescheduling-tp23740.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 --
 Iulian Dragos

 --
 Reactive Apps on the JVM
 www.typesafe.com




Re: change default storage level

2015-07-09 Thread Shixiong Zhu
Spark won't store RDDs to memory unless you use a memory StorageLevel. By
default, your input and intermediate results won't be put into memory. You
can call persist if you want to avoid duplicate computation or reading.
E.g.,

val r1 = context.wholeTextFiles(...)
val r2 = r1.flatMap(s - ...)
val r3 = r2.filter(...)...
r3.saveAsTextFile(...)
val r4 = r2.map(...)...
r4.saveAsTextFile(...)

In the avoid example, r2 will be used twice. To speed up the computation,
you can call r2.persist(StorageLevel.MEMORY) to store r2 into memory. Then
r4 will use the data of r2 in memory directly. E.g.,

val r1 = context.wholeTextFiles(...)
val r2 = r1.flatMap(s - ...)
r2.persist(StorageLevel.MEMORY)
val r3 = r2.filter(...)...
r3.saveAsTextFile(...)
val r4 = r2.map(...)...
r4.saveAsTextFile(...)

See
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


Best Regards,
Shixiong Zhu

2015-07-09 22:09 GMT+08:00 Michal Čizmazia mici...@gmail.com:

 Is there a way how to change the default storage level?

 If not, how can I properly change the storage level wherever necessary, if
 my input and intermediate results do not fit into memory?

 In this example:

 context.wholeTextFiles(...)
 .flatMap(s - ...)
 .flatMap(s - ...)

 Does persist() need to be called after every transformation?

  context.wholeTextFiles(...)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(s - ...)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(s - ...)
 .persist(StorageLevel.MEMORY_AND_DISK)

  Thanks!




Re: spark streaming kafka compatibility

2015-07-09 Thread Shushant Arora
Thanks cody, so is it means if old kafka consumer 0.8.1.1  works with kafka
cluster version 0.8.2 then spark streaming 1.3 should also work?

 I have tested standalone consumer kafka consumer 0.8.0 with kafka cluster
0.8.2 and that works.

On Thu, Jul 9, 2015 at 9:58 PM, Cody Koeninger c...@koeninger.org wrote:

 It's the consumer version.  Should work with 0.8.2 clusters.

 On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not
 compatible with kafka 0.8.2 ?

 As per maven dependency of spark streaming 1.3 with kafka


 dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion
 1.3.0
 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency

 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion
 0.8.1.1
 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency


 Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is
 compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?





orderBy + cache is invoking work on mesos cluster

2015-07-09 Thread Corey Stubbs
Spark Version: 1.3.1
Cluster: Mesos 0.22.0
Scala Version: 2.10.4

I am seeing work done on my cluster when invoking cache on an rdd. I would
have expected the last line of the code below to not invoke any cluster
work. Is there some condition where cache will do cluster work?


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
// work is done to load the json into the dataframe
val people = sc.parallelize(
  {name:Yin,address:{city:Columbus,state:Ohio}} :: Nil
)
val peoplDF = sqlContext.jsonRDD(people).toDF()
// No work is done for the orderBy, as expected
val orderBy = peoplDF.orderBy(name)
// Jobs are run when invoking cache, expectation was nothing would run on
the cluster
val orderByCache = orderBy.cache




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/orderBy-cache-is-invoking-work-on-mesos-cluster-tp23749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread ayan guha
Can you please post result of show()?
On 10 Jul 2015 01:00, Yana Kadiyska yana.kadiy...@gmail.com wrote:

 Hi folks, I just re-wrote a query from using UNION ALL to use with
 rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed
 but wanted to check if this is user error. Here is my code:

 case class KeyValue(key: Int, value: String)
 val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF

 df.registerTempTable(foo)

 sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo 
 group by value with rollup”).show(100)


 sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from 
 foo group by key%100 with rollup”).show(100)

 ​

 Grouping by value does the right thing, I get one group 0 with the overall
 count. But grouping by expression (key%100) produces weird results --
 appears that group 1 results are replicated as group 0. Am I doing
 something wrong or is this a bug?



Re: PySpark without PySpark

2015-07-09 Thread Sujit Pal
Hi Ashish,

Your 00-pyspark-setup file looks very different from mine (and from the one
described in the blog post). Questions:

1) Do you have SPARK_HOME set up in your environment? Because if not, it
sets it to None in your code. You should provide the path to your Spark
installation. In my case I have spark-1.3.1 installed under $HOME/Software
and the code block under # Configure the environment (or yellow highlight
in the code below) reflects that.
2) Is there a python2 or python subdirectory under the root of your Spark
installation? In my case its python not python2. This contains the
Python bindings for spark, so the block under # Add the PySpark/py4j to
the Python path (or green highlight in the code below) adds it to the
Python sys.path so things like pyspark.SparkContext are accessible in your
Python environment.

import os
import sys

# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, python, build))
sys.path.insert(0, os.path.join(SPARK_HOME, python))

Hope this fixes things for you.

-sujit


On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hi Sujit,
 Thanks for your response.

 So i opened a new notebook using the command ipython notebook --profile
 spark and tried the sequence of commands. i am getting errors. Attached is
 the screenshot of the same.
 Also I am attaching the  00-pyspark-setup.py for your reference. Looks
 like, I have written something wrong here. Cannot seem to figure out, what
 is it?

 Thank you for your help


 Sincerely,
 Ashish Dutt

 On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hi Ashish,

  Nice post.
 Agreed, kudos to the author of the post, Benjamin Benfort of District
 Labs.

  Following your post, I get this problem;
 Again, not my post.

 I did try setting up IPython with the Spark profile for the edX Intro to
 Spark course (because I didn't want to use the Vagrant container) and it
 worked flawlessly with the instructions provided (on OSX). I haven't used
 the IPython/PySpark environment beyond very basic tasks since then though,
 because my employer has a Databricks license which we were already using
 for other stuff and we ended up doing the labs on Databricks.

 Looking at your screenshot though, I don't see why you think its picking
 up the default profile. One simple way of checking to see if things are
 working is to open a new notebook and try this sequence of commands:

 from pyspark import SparkContext
 sc = SparkContext(local, pyspark)
 sc

 You should see something like this after a little while:
 pyspark.context.SparkContext at 0x1093c9b10

 While the context is being instantiated, you should also see lots of log
 lines scroll by on the terminal where you started the ipython notebook
 --profile spark command - these log lines are from Spark.

 Hope this helps,
 Sujit


 On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com
 wrote:

 Hi Sujit,
 Nice post.. Exactly what I had been looking for.
 I am relatively a beginner with Spark and real time data processing.
 We have a server with CDH5.4 with 4 nodes. The spark version in our
 server is 1.3.0
 On my laptop I have spark 1.3.0 too and its using Windows 7 environment.
 As per point 5 of your post I am able to invoke pyspark locally as in a
 standalone mode.

 Following your post, I get this problem;

 1. In section Using Ipython notebook with spark I cannot understand
 why it is picking up the default profile and not the pyspark profile. I am
 sure it is because of the path variables. Attached is the screenshot. Can
 you suggest how to solve this.

 Current the path variables for my laptop are like
 SPARK_HOME=C:\SPARK-1.3.0\BIN, JAVA_HOME=C:\PROGRAM
 FILES\JAVA\JDK1.7.0_79, HADOOP_HOME=D:\WINUTILS, M2_HOME=D:\MAVEN\BIN,
 MAVEN_HOME=D:\MAVEN\BIN, PYTHON_HOME=C:\PYTHON27\, SBT_HOME=C:\SBT\


 Sincerely,
 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 You are welcome Davies. Just to clarify, I didn't write the post (not
 sure if my earlier post gave that impression, apologize if so), although I
 agree its great :-).

 -sujit


 On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com
 wrote:

 Great post, thanks for sharing with us!


 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:
  Hi Julian,
 
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark
 cluster on
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
 looking
  for). Can't share the code, but the basic approach is covered in
 this blog
  post 

Re: [SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread Yana Kadiyska
+---+---+---+
|cnt|_c1|grp|
+---+---+---+
|  1| 31|  0|
|  1| 31|  1|
|  1|  4|  0|
|  1|  4|  1|
|  1| 42|  0|
|  1| 42|  1|
|  1| 15|  0|
|  1| 15|  1|
|  1| 26|  0|
|  1| 26|  1|
|  1| 37|  0|
|  1| 10|  0|
|  1| 37|  1|
|  1| 10|  1|
|  1| 48|  0|
|  1| 21|  0|
|  1| 48|  1|
|  1| 21|  1|
|  1| 32|  0|
|  1| 32|  1|
+---+---+---+

​

On Thu, Jul 9, 2015 at 11:54 AM, ayan guha guha.a...@gmail.com wrote:

 Can you please post result of show()?
 On 10 Jul 2015 01:00, Yana Kadiyska yana.kadiy...@gmail.com wrote:

 Hi folks, I just re-wrote a query from using UNION ALL to use with
 rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed
 but wanted to check if this is user error. Here is my code:

 case class KeyValue(key: Int, value: String)
 val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF

 df.registerTempTable(foo)

 sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo 
 group by value with rollup”).show(100)


 sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from 
 foo group by key%100 with rollup”).show(100)

 ​

 Grouping by value does the right thing, I get one group 0 with the
 overall count. But grouping by expression (key%100) produces weird results
 -- appears that group 1 results are replicated as group 0. Am I doing
 something wrong or is this a bug?




Re: spark streaming kafka compatibility

2015-07-09 Thread Cody Koeninger
Yes, it should work, let us know if not.

On Thu, Jul 9, 2015 at 11:34 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Thanks cody, so is it means if old kafka consumer 0.8.1.1  works with
 kafka cluster version 0.8.2 then spark streaming 1.3 should also work?

  I have tested standalone consumer kafka consumer 0.8.0 with kafka cluster
 0.8.2 and that works.

 On Thu, Jul 9, 2015 at 9:58 PM, Cody Koeninger c...@koeninger.org wrote:

 It's the consumer version.  Should work with 0.8.2 clusters.

 On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not
 compatible with kafka 0.8.2 ?

 As per maven dependency of spark streaming 1.3 with kafka


 dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion
 1.3.0
 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency

 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion
 0.8.1.1
 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency


 Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is
 compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?






Re: GraphX Synth Benchmark

2015-07-09 Thread Khaled Ammar
Hi,

I am not a spark expert but I found that passing a small partitions value
might help. Try to use this option --numEPart=$partitions where
partitions=3 (number of workers) or at most 3*40 (total number of cores).

Thanks,
-Khaled

On Thu, Jul 9, 2015 at 11:37 AM, AshutoshRaghuvanshi 
ashutosh.raghuvans...@gmail.com wrote:

 I am running spark cluster over ssh in standalone mode,

 I have run pagerank LiveJounral example:

 MASTER=spark://172.17.27.12:7077 bin/run-example graphx.SynthBenchmark
 -app=pagerank -niters=100 -nverts=4847571  Output/soc-liveJounral.txt

 its been running for more than 2hours, I guess this is not normal, what am
 i
 doing wrong?

 system details:
 4 nodes (1+3)
 40 cores each, 64G memory out of which I have given spark.executer 50G

 one more this I notice one of the server is used more than others.

 Please help ASAP.

 Thank you
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23747/13.png



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Synth-Benchmark-tp23747.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Thanks,
-Khaled


spark streaming kafka compatibility

2015-07-09 Thread Shushant Arora
Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not
compatible with kafka 0.8.2 ?

As per maven dependency of spark streaming 1.3 with kafka

dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion
1.3.0
/versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion
0.8.1.1
/versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency


Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is
compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?


Re: spark streaming kafka compatibility

2015-07-09 Thread Cody Koeninger
It's the consumer version.  Should work with 0.8.2 clusters.

On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not
 compatible with kafka 0.8.2 ?

 As per maven dependency of spark streaming 1.3 with kafka


 dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion
 1.3.0
 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency

 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion
 0.8.1.1
 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency


 Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is
 compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?




What is a best practice for passing environment variables to Spark workers?

2015-07-09 Thread dgoldenberg
I have about 20 environment variables to pass to my Spark workers. Even
though they're in the init scripts on the Linux box, the workers don't see
these variables.

Does Spark do something to shield itself from what may be defined in the
environment?

I see multiple pieces of info on how to pass the env vars into workers and
they seem dated and/or unclear.

Here:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tt5780.html

SparkConf conf = new SparkConf(); 
conf.set(spark.myapp.myproperty, propertyValue); 
OR
set them in spark-defaults.conf, as in
spark.config.one value
spark.config.two value2

In another posting,
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tt3180.html:
conf.setExecutorEnv(ORACLE_HOME, myOraHome) 
conf.setExecutorEnv(SPARK_JAVA_OPTS,
-Djava.library.path=/my/custom/path) 

The configuration guide talks about
spark.executorEnv.[EnvironmentVariableName] -- Add the environment variable
specified by EnvironmentVariableName to the Executor process. The user can
specify multiple of these to set multiple environment variables.

Then there are mentions of SPARK_JAVA_OPTS which seems to be deprecated (?)

What is the easiest/cleanest approach here?  Ideally, I'd not want to burden
my driver program with explicit knowledge of all the env vars that are
needed on the worker side.  I'd also like to avoid having to jam them into
spark-defaults.conf since they're already set in the system init scripts, so
why duplicate.

I suppose one approach would be to namespace all my vars to start with a
well-known prefix, then cycle through the env in the driver and stuff all
these variables into the Spark context.  If I'm doing that, would I want to 

conf.set(spark.myapp.myproperty, propertyValue);

and is spark. necessary? or was that just part of the example?

or would I want to

conf.setExecutorEnv(MYPREFIX_MY_VAR_1, some-value);

Thanks.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-best-practice-for-passing-environment-variables-to-Spark-workers-tp23751.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to ignore features in mllib

2015-07-09 Thread Arun Luthra
Is it possible to ignore features in mllib? In other words, I would like to
have some 'pass-through' data, Strings for example, attached to training
examples and test data.

A related stackoverflow question:
http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier

Arun


Scheduler delay vs. Getting result time

2015-07-09 Thread hbogert
Hi, 

In the Spark UI, under “Show additional metrics”, there are two extra
metrics you can show 
.1 Scheduler delay
.2 and Getting result time

When hovering “Scheduler Delay it says (among other things):
…time to send task result from executor…

When hovering “Getting result time”:
Time that the driver spends fetching task results from workers.

What are the differences between the two?

In my case I’m benchmarking with some sleep commands and returning some big
arrays, per task, to emulate execution time and network communication
respectively. I can’t see any “Getting Result Time” increases, they are
simple 0ms. I’m using a ‘collect’ command and can see the synthetic result
arrays when I use a spark-shell.

Regards,

Hans



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scheduler-delay-vs-Getting-result-time-tp23752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is faster for SQL table storage, On-Heap or off-heap?

2015-07-09 Thread Brandon White
Is the read / aggregate performance better when caching Spark SQL tables
on-heap with sqlContext.cacheTable() or off heap by saving it to Tachyon?

Has anybody tested this? Any theories?


Re: Spark Streaming Hangs on Start

2015-07-09 Thread Tathagata Das
1. There will be a long running job with description start() as that is
the jobs that is running the receivers. It will never end.

2. You need to set the number of cores given to the Spark executors by the
YARN container. That is SparkConf spark.executor.cores,  --executor-cores
in spark-submit. Since it is by default 1, your only container has one core
which is occupied by the receiver, leaving no cores to run the map tasks.
So the map stage is blocked

3.  Note these log lines. Especially 15/07/09 18:29:00 INFO
receiver.ReceiverSupervisorImpl: Received stop signal . I think somehow
your streaming context is being shutdown too early which is causing the
KafkaReceiver to stop. Something your should debug.


15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
Starting
15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1436437633199] Added fetcher for partitions
ArrayBuffer([[adhoc_data,0], initOffset 53 to broker
id:42,host:szq1.appadhoc.com,port:9092] )
15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680)
called with curMem=96628, maxMem=16669841817
15/07/09 18:27:13 INFO storage.MemoryStore: Block
input-0-1436437633600 stored as bytes in memory (estimated size 1680.0
B, free 15.5 GB)
15/07/09 18:27:13 WARN storage.BlockManager: Block
input-0-1436437633600 replicated to only 0 peer(s) instead of 1 peers
15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block
input-0-1436437633600*15/07/09 18:29:00 INFO
receiver.ReceiverSupervisorImpl: Received stop signal
*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping
receiver with message: Stopped by driver:
15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector:
[adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201],
ZKConsumerConnector shutting down
15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1436437633199] Stopping leader finder thread
15/07/09 18:29:00 INFO
consumer.ConsumerFetcherManager$LeaderFinderThread:
[adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
Shutting down
15/07/09 18:29:00 INFO
consumer.ConsumerFetcherManager$LeaderFinderThread:
[adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
Stopped
15/07/09 18:29:00 INFO
consumer.ConsumerFetcherManager$LeaderFinderThread:
[adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
Shutdown completed
15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1436437633199] Stopping all fetchers
15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
Shutting down
15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException
15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
Stopped
15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
Shutdown completed
15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1436437633199] All connections stopped
15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed
15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down
15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector:
[adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201],
ZKConsumerConnector shutdown completed in 74 ms
15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0


Re: How to ignore features in mllib

2015-07-09 Thread Burak Yavuz
If you use the Pipelines Api with DataFrames, you select which columns you
would like to train on using the VectorAssembler. While using the
VectorAssembler, you can choose not to select some features if you like.

Best,
Burak

On Thu, Jul 9, 2015 at 10:38 AM, Arun Luthra arun.lut...@gmail.com wrote:

 Is it possible to ignore features in mllib? In other words, I would like
 to have some 'pass-through' data, Strings for example, attached to training
 examples and test data.

 A related stackoverflow question:
 http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier

 Arun



Re: Remote spark-submit not working with YARN

2015-07-09 Thread Juan Gordon
Hi ,

I checked the logs and it looks like YARN is trying to comunicate with my
test server through the local IP ( SPARK cluster and my test server are
in differents VPC in Amazon EC2) and thats why YARN can't response.

I try the same script in yarn-cluster mode and it runs correctly in that
way.

So i think that my issue is solved using yarn-cluster.

Thanks a lot,

JG

On Wed, Jul 8, 2015 at 7:24 PM, canan chen ccn...@gmail.com wrote:

 The application is accepted by YARN RM. Just as Sandy mentioned, please
 look at the RM log, there must be some useful info there.

 On Thu, Jul 9, 2015 at 7:27 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Strange.  Does the application show up at all in the YARN web UI?  Does 
 application_1436314873375_0030
 show up at all in the YARN ResourceManager logs?

 -Sandy

 On Wed, Jul 8, 2015 at 3:32 PM, Juan Gordon jgordo...@gmail.com wrote:

 Hello Sandy,

 Yes I'm sure that YARN has the enought resources, i checked it in the
 WEB UI page of my cluster

 Also, i'm able to submit the same script in any of the nodes of the
 cluster.

 That's why i don't understand whats happening.

 Thanks

 JG

 On Wed, Jul 8, 2015 at 5:26 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi JG,

 One way this can occur is that YARN doesn't have enough resources to
 run your job.  Have you verified that it does?  Are you able to submit
 using the same command from a node on the cluster?

 -Sandy

 On Wed, Jul 8, 2015 at 3:19 PM, jegordon jgordo...@gmail.com wrote:

 I'm trying to submit a spark job from a different server outside of my
 Spark
 Cluster (running spark 1.4.0, hadoop 2.4.0 and YARN) using the
 spark-submit
 script :

 spark/bin/spark-submit --master yarn-client --executor-memory 4G
 myjobScript.py

 The think is that my application never pass from the accepted state, it
 stuck on it :

 15/07/08 16:49:40 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:41 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:42 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:43 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:44 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:45 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:46 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:47 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:48 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)
 15/07/08 16:49:49 INFO Client: Application report for
 application_1436314873375_0030 (state: ACCEPTED)

 But if i execute the same script with spark-submit in the master
 server of
 my cluster it runs correctly.

 I already set the yarn configuration in the remote server in
 $YARN_CONF_DIR/yarn-site.xml like this :

  property
 nameyarn.resourcemanager.hostname/name
 value54.54.54.54/value
  /property

  property
nameyarn.resourcemanager.address/name
value54.54.54.54:8032/value
descriptionEnter your ResourceManager hostname./description
  /property

  property
nameyarn.resourcemanager.scheduler.address/name
value54.54.54.54:8030/value
descriptionEnter your ResourceManager hostname./description
  /property

  property
nameyarn.resourcemanager.resourcetracker.address/name
value54.54.54.54:8031/value
descriptionEnter your ResourceManager hostname./description
  /property
 Where 54.54.54.54 is the IP of my resourcemanager node.

 Why is this happening? do i have to configure something else in YARN to
 accept remote submits? or what am i missing?

 Thanks a lot

 JG




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Remote-spark-submit-not-working-with-YARN-tp23728.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Saludos,
 Juan Gordon






-- 
Saludos,
Juan Gordon


Re: spark ec2 as non-root / any plan to improve that in the future ?

2015-07-09 Thread Nicholas Chammas
No plans to change that at the moment, but agreed it is against accepted
convention. It would be a lot of work to change the tool, change the AMIs,
and test everything. My suggestion is not to hold your breath for such a
change.

spark-ec2, as far as I understand, is not intended for spinning up
permanent or production infrastructure (though people may use it for those
purposes), so there isn't a big impetus to fix this kind of issue. It works
really well for what it was intended for: spinning up clusters for testing,
prototyping, and experimenting.

Nick

On Thu, Jul 9, 2015 at 3:25 AM matd matd...@gmail.com wrote:

 Hi,

 Spark ec2 scripts are useful, but they install everything as root.
 AFAIK, it's not a good practice ;-)

 Why is it so ?
 Should these scripts reserved for test/demo purposes, and not to be used
 for
 a production system ?
 Is it planned in some roadmap to improve that, or to replace ec2-scripts
 with something else ?

 Would it be difficult to change them to use a sudo-er instead ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-as-non-root-any-plan-to-improve-that-in-the-future-tp23734.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Caching in spark

2015-07-09 Thread vinod kumar
Hi Guys,

Can any one please share me how to use caching feature of spark via spark
sql queries?

-Vinod


SPARK vs SQL

2015-07-09 Thread vinod kumar
Hi Everyone,

Is there is any document/material which compares spark with SQL server?

If so please share me the details.

Thanks,
Vinod


Numer of runJob at SparkPlan.scala:122 in Spark SQL

2015-07-09 Thread Wojciech Pituła
Hey,

I was wondering if it is possible to tune number of jobs generated by spark
sql? Currently my query generates over 80 runJob at SparkPlan.scala:122
jobs, every one of them gets executed in ~4 sec and contains only 5 tasks.
As a result of this, most of my cores do nothing.


Performance slow

2015-07-09 Thread Ravisankar Mani
Hi everyone,

More time to be taken when i execute query using  (group by +
order by) or (group by + cast + order by)   in same query. Kindly refer the
following query

Could you please provide any solution regarding thisd performance issue?



SELECT If(ISNOTNULL(SUM(CAST(adventurepersoncontacts.contactid AS
decimal(38,6,SUM(CAST(adventurepersoncontacts.contactid AS
decimal(38,6))),0) AS adventurepersoncontacts_contactid,
adventurepersoncontacts.fullname as adventurepersoncontacts_fullname FROM
default.adventurepersoncontacts AS adventurepersoncontacts order by
adventurepersoncontacts.fullname asc

Regards,
Ravisankar M R


RE: [SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread Cheng, Hao
Never mind, I’ve created the jira issue at 
https://issues.apache.org/jira/browse/SPARK-8972.

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Friday, July 10, 2015 9:15 AM
To: yana.kadiy...@gmail.com; ayan guha
Cc: user
Subject: RE: [SparkSQL] Incorrect ROLLUP results

Yes, this is a bug, do you mind to create a jira issue for this? I will fix 
this asap.

BTW, what’s your spark version?

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Friday, July 10, 2015 12:16 AM
To: ayan guha
Cc: user
Subject: Re: [SparkSQL] Incorrect ROLLUP results


+---+---+---+

|cnt|_c1|grp|

+---+---+---+

|  1| 31|  0|

|  1| 31|  1|

|  1|  4|  0|

|  1|  4|  1|

|  1| 42|  0|

|  1| 42|  1|

|  1| 15|  0|

|  1| 15|  1|

|  1| 26|  0|

|  1| 26|  1|

|  1| 37|  0|

|  1| 10|  0|

|  1| 37|  1|

|  1| 10|  1|

|  1| 48|  0|

|  1| 21|  0|

|  1| 48|  1|

|  1| 21|  1|

|  1| 32|  0|

|  1| 32|  1|

+---+---+---+
​

On Thu, Jul 9, 2015 at 11:54 AM, ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com wrote:

Can you please post result of show()?
On 10 Jul 2015 01:00, Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote:
Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and 
I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to 
check if this is user error. Here is my code:


case class KeyValue(key: Int, value: String)

val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF



df.registerTempTable(foo)



sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo 
group by value with rollup”).show(100)





sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo 
group by key%100 with rollup”).show(100)
​

Grouping by value does the right thing, I get one group 0 with the overall 
count. But grouping by expression (key%100) produces weird results -- appears 
that group 1 results are replicated as group 0. Am I doing something wrong or 
is this a bug?



Re: [X-post] Saving SparkSQL result RDD to Cassandra

2015-07-09 Thread Su She
Thanks Todd, this was helpful! I also got some help from the other forum,
and for those that might run into this problem in the future, the solution
that worked for me was:

foreachRDD {r = r.map(x = data(x.getString(0),
x.getInt(1))).saveToCassandra(demo, sqltest)}

On Thu, Jul 9, 2015 at 4:37 PM, Todd Nist tsind...@gmail.com wrote:

 foreachRDD returns a unit:

 def foreachRDD(foreachFunc: (RDD
 https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html
 [T]) ⇒ Unit): Unit

 Apply a function to each RDD in this DStream. This is an output operator,
 so 'this' DStream will be registered as an output stream and therefore
 materialized.

 Change it to a map, foreach or some other form of transform.

 HTH

 -Todd


 On Thu, Jul 9, 2015 at 5:24 PM, Su She suhsheka...@gmail.com wrote:

 Hello All,

 I also posted this on the Spark/Datastax thread, but thought it was also
 50% a spark question (or mostly a spark question).

 I was wondering what is the best practice to saving streaming Spark SQL (
 https://github.com/Intel-bigdata/spark-streamingsql/blob/master/src/main/scala/org/apache/spark/sql/streaming/examples/KafkaDDL.scala)
 results to Cassandra?

 The query looks like this:

  streamSqlContext.sql(
   
 |SELECT t.word, COUNT(t.word)
 |FROM (SELECT * FROM t_kafka) OVER (WINDOW '9' SECONDS, SLIDE '3'
 SECONDS) AS t
 |GROUP BY t.word
   .stripMargin)
   .foreachRDD { r = r.toString()}.map(x =
 x.split(,)).map(x=data(x(0),x(1))).saveToCassandra(demo, sqltest)

 I’m getting a message saying map isn’t a member of Unit.

 I thought since I'm converting it to a string I can call a map/save to
 Cassandra function there, but it seems like I can't call map after
 r.toString()?

 Please let me know if this is possible and what is the best way of doing
 this. Thank you for the help!

 -Su





Re: Spark serialization in closure

2015-07-09 Thread Chen Song
Thanks Andrew.

I tried with your suggestions and (2) works for me. (1) still doesn't work.

Chen

On Thu, Jul 9, 2015 at 4:58 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 I believe the issue is that `object foo` is a member of `object testing`,
 so the only way to access `object foo` is to first pull `object testing`
 into the closure, then access a pointer to get to `object foo`. There are
 two workarounds that I'm aware of:

 (1) Move `object foo` outside of `object testing`. This is only a problem
 because of the nested objects. Also, by design it's simpler to reason about
 but that's a separate discussion.

 (2) Create a local variable for `foo.v`. If all your closure cares about
 is the integer, then it makes sense to add a `val v = foo.v` inside `func`
 and use this in your closure instead. This avoids pulling in $outer
 pointers into your closure at all since it only references local variables.

 As others have commented, I think this is more of a Scala problem than a
 Spark one.

 Let me know if these work,
 -Andrew

 2015-07-09 13:36 GMT-07:00 Richard Marscher rmarsc...@localytics.com:

 Reading that article and applying it to your observations of what happens
 at runtime:

 shouldn't the closure require serializing testing? The foo singleton
 object is a member of testing, and then you call this foo value in the
 closure func and further in the foreachPartition closure. So following by
 that article, Scala will attempt to serialize the containing object/class
 testing to get the foo instance.

 On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote:

 Repost the code example,

 object testing extends Serializable {
 object foo {
   val v = 42
 }
 val list = List(1,2,3)
 val rdd = sc.parallelize(list)
 def func = {
   val after = rdd.foreachPartition {
 it = println(foo.v)
   }
 }
   }

 On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Erik. I saw the document too. That is why I am confused because
 as per the article, it should be good as long as *foo *is
 serializable. However, what I have seen is that it would work if
 *testing* is serializable, even foo is not serializable, as shown
 below. I don't know if there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but
 I am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing
 the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




 --
 Chen Song




 --
 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com http://localytics.com/ | Our Blog
 http://localytics.com/blog | Twitter http://twitter.com/localytics |
 Facebook http://facebook.com/localytics | LinkedIn
 http://www.linkedin.com/company/1148792?trk=tyah





-- 
Chen Song


Re: work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Michal Čizmazia
Thanks Matei! It worked.

On 9 July 2015 at 19:43, Matei Zaharia matei.zaha...@gmail.com wrote:

 Thus means that one of your cached RDD partitions is bigger than 2 GB of
 data. You can fix it by having more partitions. If you read data from a
 file system like HDFS or S3, set the number of partitions higher in the
 sc.textFile, hadoopFile, etc methods (it's an optional second parameter to
 those methods). If you create it through parallelize or if this particular
 RDD comes from a shuffle, use more tasks in the parallelize or shuffle.

 Matei

 On Jul 9, 2015, at 3:35 PM, Michal Čizmazia mici...@gmail.com wrote:

 Spark version 1.4.0 in the Standalone mode

 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
 BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
 GB)
 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
 Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
 at
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
 at
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
 at
 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
 at
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)
 at
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:615)
 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


 On 9 July 2015 at 18:11, Ted Yu yuzhih...@gmail.com wrote:

 Which release of Spark are you using ?

 Can you show the complete stack trace ?

 getBytes() could be called from:
 getBytes(file, 0, file.length)
 or:
 getBytes(segment.file, segment.offset, segment.length)

 Cheers

 On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com
 wrote:

 Please could anyone give me pointers for appropriate SparkConf to work
 around Size exceeds Integer.MAX_VALUE?

 Stacktrace:

 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
 BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
 GB)
 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
 Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
 at
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
 ...







Re: Data Processing speed SQL Vs SPARK

2015-07-09 Thread vinod kumar
For records below 50,000 SQL is better right?


On Fri, Jul 10, 2015 at 12:18 AM, ayan guha guha.a...@gmail.com wrote:

 With your load, either should be fine.

 I would suggest you to run couple of quick prototype.

 Best
 Ayan

 On Fri, Jul 10, 2015 at 2:06 PM, vinod kumar vinodsachin...@gmail.com
 wrote:

 Ayan,

 I would want to process a data which  nearly around 5 records to 2L
 records(in flat).

 Is there is any scaling is there to decide what technology is best?either
 SQL or SPARK?



 On Thu, Jul 9, 2015 at 9:40 AM, ayan guha guha.a...@gmail.com wrote:

 It depends on workload. How much data you would want to process?
 On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote:

 Hi Everyone,

 I am new to spark.

 Am using SQL in my application to handle data in my application.I have
 a thought to move to spark now.

 Is data processing speed of spark better than SQL server?

 Thank,
 Vinod





 --
 Best Regards,
 Ayan Guha



RE: [SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread Cheng, Hao
Yes, this is a bug, do you mind to create a jira issue for this? I will fix 
this asap.

BTW, what’s your spark version?

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Friday, July 10, 2015 12:16 AM
To: ayan guha
Cc: user
Subject: Re: [SparkSQL] Incorrect ROLLUP results


+---+---+---+

|cnt|_c1|grp|

+---+---+---+

|  1| 31|  0|

|  1| 31|  1|

|  1|  4|  0|

|  1|  4|  1|

|  1| 42|  0|

|  1| 42|  1|

|  1| 15|  0|

|  1| 15|  1|

|  1| 26|  0|

|  1| 26|  1|

|  1| 37|  0|

|  1| 10|  0|

|  1| 37|  1|

|  1| 10|  1|

|  1| 48|  0|

|  1| 21|  0|

|  1| 48|  1|

|  1| 21|  1|

|  1| 32|  0|

|  1| 32|  1|

+---+---+---+
​

On Thu, Jul 9, 2015 at 11:54 AM, ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com wrote:

Can you please post result of show()?
On 10 Jul 2015 01:00, Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote:
Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and 
I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to 
check if this is user error. Here is my code:


case class KeyValue(key: Int, value: String)

val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF



df.registerTempTable(foo)



sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo 
group by value with rollup”).show(100)





sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo 
group by key%100 with rollup”).show(100)
​

Grouping by value does the right thing, I get one group 0 with the overall 
count. But grouping by expression (key%100) produces weird results -- appears 
that group 1 results are replicated as group 0. Am I doing something wrong or 
is this a bug?



Re: Data Processing speed SQL Vs SPARK

2015-07-09 Thread vinod kumar
Ayan,

I would want to process a data which  nearly around 5 records to 2L
records(in flat).

Is there is any scaling is there to decide what technology is best?either
SQL or SPARK?



On Thu, Jul 9, 2015 at 9:40 AM, ayan guha guha.a...@gmail.com wrote:

 It depends on workload. How much data you would want to process?
 On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote:

 Hi Everyone,

 I am new to spark.

 Am using SQL in my application to handle data in my application.I have a
 thought to move to spark now.

 Is data processing speed of spark better than SQL server?

 Thank,
 Vinod




Re: Data Processing speed SQL Vs SPARK

2015-07-09 Thread ayan guha
With your load, either should be fine.

I would suggest you to run couple of quick prototype.

Best
Ayan

On Fri, Jul 10, 2015 at 2:06 PM, vinod kumar vinodsachin...@gmail.com
wrote:

 Ayan,

 I would want to process a data which  nearly around 5 records to 2L
 records(in flat).

 Is there is any scaling is there to decide what technology is best?either
 SQL or SPARK?



 On Thu, Jul 9, 2015 at 9:40 AM, ayan guha guha.a...@gmail.com wrote:

 It depends on workload. How much data you would want to process?
 On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote:

 Hi Everyone,

 I am new to spark.

 Am using SQL in my application to handle data in my application.I have a
 thought to move to spark now.

 Is data processing speed of spark better than SQL server?

 Thank,
 Vinod





-- 
Best Regards,
Ayan Guha


[Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?

2015-07-09 Thread Terry Hole
Hi,

I am trying to set the hive metadata destination to a mysql database in
hive context, it works fine in spark 1.3.1, but it seems broken in spark
1.4.1-rc1, where it always connect to the default metadata: local), is this
a regression or we must set the connection in hive-site.xml?

The code is very simple in spark shell:
   * import org.apache.spark.sql.hive._*
*val hiveContext = new HiveContext(sc)*
*hiveContext.setConf(javax.jdo.option.ConnectionDriveName,
com.mysql.jdbc.Driver)*
*hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)*
*hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)*
*hiveContext.setConf(javax.jdo.option.ConnectionURL,
jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)*
*hiveContext.setConf(hive.metastore.warehouse.dir,
/user/hive/warehouse)*
*hiveContext.sql(select * from mysqltable).show()*

*Thanks!*
*-Terry*


Apache Spark : Custom function for reduceByKey - missing arguments for method

2015-07-09 Thread ameyamm
I am trying to normalize a dataset (convert values for all attributes in the
vector to 0-1 range). I created an RDD of tuple (attrib-name,
attrib-value) for all the records in the dataset as follows:

val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( 
  contact = { 
List(
  (dage,contact.dage match { case Some(value)
= DoubleDimension(value) ; case None = null }),
  (dancstry1,contact.dancstry1 match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (dancstry2,contact.dancstry2 match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (ddepart,contact.ddepart match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (dhispanic,contact.dhispanic match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (dhour89,contact.dhour89 match { case
Some(value) = DoubleDimension(value) ; case None = null })
)
  }
)

Here, contactDataset is of the type RDD[Contact]. The fields of Contact
class are of type Option[Long].

DoubleDimension is a simple wrapper over Double datatype. It extends the
Ordered trait and implements corresponding compare method and equals method.

To obtain the max and min attribute vector for computing the normalized
values,

maxVector = attribMap.reduceByKey( getMax )
minVector = attribMap.reduceByKey( getMin )

Implementation of getMax and getMin is as follows:

def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = {
if (a  b) a 
else b 
}

def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = {
if (a  b) a 
else b 
}

I get a compile error at calls to the methods getMax and getMin stating:

[ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error:
missing arguments for method getMax in class DatasetReader;

[ERROR] follow this method with '_' if you want to treat it as a partially
applied function

[ERROR] maxVector = attribMap.reduceByKey( getMax )

[ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error:
missing arguments for method getMin in class DatasetReader;

[ERROR] follow this method with '_' if you want to treat it as a partially
applied function

[ERROR] minVector = attribMap.reduceByKey( getMin )

I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as
per my knowledge, I can pass any method to it as long as the functions is of
the type f : (V, V) = V.

I am really stuck here. Please help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark without PySpark

2015-07-09 Thread Sujit Pal
Hi Ashish,

Julian's approach is probably better, but few observations:

1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin).

2) If you have anaconda python installed (I saw that you had set this up in
a separate thread, py4j should be part of the package - at least I think
so. To test this, try in your python repl:
 from py4j.java_gateway import JavaGateway
if it succeeds you already have it.

3) In case Py4J is not installed, the best way to install a new package is
using easy_install or pip. Make sure your path is set up so when you call
python you are calling the anaconda version (in case you have multiple
python versions), then if so, do easy_install py4j - this will install
py4j correctly without any messing around on your part. Install
instructions for py4j available on their site:
http://py4j.sourceforge.net/install.html

4) You should replace the python2 in your 00-setup-script with python,
so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python).

-sujit


On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hello Sujit,
 Many thanks for your response.
 To answer your questions;
 Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is
 SPARK_HOME=C:/spark-1.3.0/bin
 Q2) Is there a python2 or python subdirectory under the root of your
 Spark installation? - Yes, i do have that too. It is called python. To fix
 this problem this is what I did,
 I downloaded py4j-0.8.2.1-src from here
 https://pypi.python.org/pypi/py4j which was not there initially when I
 downloaded the spark package from the official repository. I then put it in
 the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the
 zip file. I put it in as it is.
 The pyspark folder of the spark-1.3.0 root folder. What I next did was
 copy this file and put it in the  pythonpath. So my python path now reads
 as PYTHONPATH=C:/Python27/

 I then rebooted the computer and a silent prayer :-) Then I opened the
 command prompt and invoked the command pyspark from the bin directory of
 spark and EUREKA, it worked :-)  Attached is the screenshot for the same.
 Now, the problem is with IPython notebook. I cannot get it to work with
 pySpark.
 I have a cluster with 4 nodes using CDH5.4

 I was able to resolve the problem. Now the next challenge was to configure
 it with IPython. Followed the steps as documented in the blog. And I get
 the errors, attached is the screenshot

 @Julian, I tried your method too. Attached is the screenshot of the error
 message 7.png

 Hope you can help me out to fix this problem.
 Thank you for your time.

 Sincerely,
 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Fri, Jul 10, 2015 at 12:02 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hi Ashish,

 Your 00-pyspark-setup file looks very different from mine (and from the
 one described in the blog post). Questions:

 1) Do you have SPARK_HOME set up in your environment? Because if not, it
 sets it to None in your code. You should provide the path to your Spark
 installation. In my case I have spark-1.3.1 installed under $HOME/Software
 and the code block under # Configure the environment (or yellow highlight
 in the code below) reflects that.
 2) Is there a python2 or python subdirectory under the root of your Spark
 installation? In my case its python not python2. This contains the
 Python bindings for spark, so the block under # Add the PySpark/py4j to
 the Python path (or green highlight in the code below) adds it to the
 Python sys.path so things like pyspark.SparkContext are accessible in your
 Python environment.

 import os
 import sys

 # Configure the environment
 if 'SPARK_HOME' not in os.environ:
 os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1

 # Create a variable for our root path
 SPARK_HOME = os.environ['SPARK_HOME']

 # Add the PySpark/py4j to the Python Path
 sys.path.insert(0, os.path.join(SPARK_HOME, python, build))
 sys.path.insert(0, os.path.join(SPARK_HOME, python))

 Hope this fixes things for you.

 -sujit


 On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com
 wrote:

 Hi Sujit,
 Thanks for your response.

 So i opened a new notebook using the command ipython notebook --profile
 spark and tried the sequence of commands. i am getting errors. Attached is
 the screenshot of the same.
 Also I am attaching the  00-pyspark-setup.py for your reference. Looks
 like, I have written something wrong here. Cannot seem to figure out, what
 is it?

 Thank you for your help


 Sincerely,
 Ashish Dutt

 On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hi Ashish,

  Nice post.
 Agreed, kudos to the author of the post, Benjamin Benfort of District
 Labs.

  Following your post, I get this problem;
 Again, not my post.

 I did try setting up IPython with the Spark profile for the edX Intro
 to Spark course (because I didn't 

Re: Connecting to nodes on cluster

2015-07-09 Thread Ashish Dutt
Hello Akhil,

Thanks for the response. I will have to figure this out.

Sincerely,
Ashish

On Thu, Jul 9, 2015 at 3:40 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 On Wed, Jul 8, 2015 at 7:31 PM, Ashish Dutt ashish.du...@gmail.com
 wrote:

 Hi,

 We have a cluster with 4 nodes. The cluster uses CDH 5.4 for the past two
 days I have been trying to connect my laptop to the server using spark
 master ip:port but its been unsucessful.
 The server contains data that needs to be cleaned and analysed.
 The cluster and the nodes are on linux environment.
 To connect to the nodes I am usnig SSH

 Question: Would it be better if I work directly on the nodes rather than
 trying to connect my laptop to them ?


 ​- You will be able to connect to master machine in the cloud from your
 laptop​

 ​, but you need to make sure that the master is able to connect back to
 your laptop (may require port forwarding on your router, firewalls etc.)
  ​
 ​

 Question 2: If yes, then can you suggest any python and R IDE that I can
 install on the nodes to make it work?


 ​- Once the master machine is able to connect to your laptop's public ip,
 then you can set the spark.driver.host and spark.driver.port properties and
 your job will get executed on the cluster.
 ​



 Thanks for your help


 Sincerely,
 Ashish Dutt





Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Tathagata Das
If you are continuously unioning RDDs, then you are accumulating ever
increasing data, and you are processing ever increasing amount of data in
every batch. Obviously this is going to not last for very long. You
fundamentally cannot keep processing ever increasing amount of data with
finite resources, isnt it?

On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand





Re: Problem in Understanding concept of Physical Cores

2015-07-09 Thread Tathagata Das
Query 1) What spark runs is tasks in task slots, whatever is the mapping ot
tasks to physical cores it does not matter. If there are two task slots (2
threads in local mode, or an executor with 2 task slots in distributed
mode), it can only run two tasks concurrently. That is true even if the
task is really not doing much. There is no multiplexing going on between
tasks and task slots. So to answer your query 1, there is 1 thread that is
permanently allocated to the receiver task (a long running task) even if it
does not do much. There is no thread left to process the data that is being
received.

Query 2) I think this is already explained above. The receiver task is
taking the only available slot, leaving nothing for the actual tasks to
execute. This will work fine as long as there is n+1 threads, where n =
number of receivers.

Query 3) The 2nd thread will be running tasks that process the in-memory
blocks of data generated by the receiver running on the first thread. Now
if the operating system underneath has only one core (physical or virtual),
then those two thread will be multiplexing the resources of that core.



On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma asharma...@gmail.com
wrote:

 Thanks for revert.I still have a confusion. Kindly find my
 understanding

 Following is the code

 
 val ssc = new StreamingContext(sc, Seconds(1))
 val lines = ssc.socketTextStream(localhost, )
 lines.print()
 ssc.start()

 

 Case 1: When I launch VM with only 1 core and start spark-shell without
 any parameter then as per above explanation it uses local[*] implicitly and
 it creates 1 thread as VM has 1 core.

 Query 1) But what does it try to execute in that 1 explicit thread ? Does
 Receiver does not get executed or does task does not get executed because
 Receiver is not heavy , i am entering only 1 line so shouldn't same
 physical core be shared with Receiver(internal thread) and thread running
 task ?
 For example-- My VM has 1 physical core and multiple daemons like
 master/worker etc are also working successfully with sharing 1 physical
 core only. Also what I understand is that Executor has a JVM in which
 Receiver is executing as a internal thread and 1 thread (for executing
 task) is created in same JVM but for some reason it does not get CPU.

 Query 2) Extending above mentioned analogy to another case, not in Spark
 Streaming, but normal Spark core. If I read input data with 3 partitions
 with 1 physical core and do some action on it then also 3 tasks should be
 created and each task should be handled in a separate thread inside
 executor JVM. It also works which means single physical core executes 3
 different threads executing 3 tasks for 3 partitions. So why Streaming case
 does not get execute.

 Case 2: When I launch VM with only 1 core and start spark-shell with
 --master local[2] then as per above explanation it uses local[2] implicitly
 and it creates 2 thread but my VM has still 1 physical core

 Query 3) Now when 2 threads are created, but my input data has 1
 partition, so still it requires only 1 task and Receiver is an internal
 thread in Executor JVM. What goes in extra in thread 2 in this case , which
 was not getting executed in above case with 1 thread only. And even if 2
 threads are created , they are still to be executed by same physical core
 so kindly elaborate what is extra processing in extra thread in this case.

 Thanks and Regards
 Aniruddh

 On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das t...@databricks.com wrote:

 There are several levels of indirection going on here, let me clarify.

 In the local mode, Spark runs tasks (which includes receivers) using the
 number of threads defined in the master (either local, or local[2], or
 local[*]).
 local or local[1] = single thread, so only one task at a time
 local[2] = 2 threads, so two tasks
 local[*] = as many threads as the number cores it can detect through the
 operating system.


 Test 1: When you dont specify master in spark-submit, it uses local[*]
 implicitly, so it uses as many threads as the number of cores that VM has.
 Between 1 and 2 VM cores, the behavior was as expected.
 Test 2: When you specified master as local[2], it used two threads.

 HTH

 TD

 On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma asharma...@gmail.com
 wrote:

 Hi

 I am new to Spark. Following is the problem that I am facing

 Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
 and I ran simple Streaming example in spark-shell with sending data on 
 port and trying to read it. With 1 core allocated to this nothing happens
 in my streaming program and it does not receive data. Now I restart VM with
 2 cores allocated to it and start spark-shell again and ran Streaming
 example again and this time it works

 Query a): From this test I concluded that 

Re: Spark Streaming Hangs on Start

2015-07-09 Thread Tathagata Das
Do you have enough cores in the configured number of executors in YARN?



On Thu, Jul 9, 2015 at 2:29 AM, Bin Wang wbi...@gmail.com wrote:

 I'm using spark streaming with Kafka, and submit it to YARN cluster with
 mode yarn-cluster. But it hangs at SparkContext.start(). The Kafka config
 is right since it can show some events in Streaming tab of web UI.

 The attached file is the screen shot of the Jobs tab of web UI. The code
 in the main class is:

 object StatCounter {

   val config = ConfigFactory.load()
   val redisUrl = config.getString(redis.url)
   val redisPort = config.getInt(redis.port)
   val zkQuorum = config.getString(kafka.zkQuorum)
   val group = config.getString(kafka.group)
   val topic = config.getString(kafka.topic)
   val threadNum = config.getInt(kafka.threadNum)

   val cache = new RedisCache(redisUrl, redisPort)

   def main(args: Array[String]): Unit = {
 val conf = new SparkConf()
 .setAppName(config.getString(spark.name))
 .set(spark.cassandra.connection.host,
 config.getString(cassandra.host))

 val ssc = new StreamingContext(conf,
 Seconds(config.getInt(spark.interval)))
 ssc.checkpoint(config.getString(spark.checkpoint))
 val storage = new CassandraStorage(adhoc_data, ssc)

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
 threadNum)).map(_._2)

 val logs = lines.flatMap(line = Parser.parseBody(line, cache))
 Counter.count(logs, storage)

 sys.ShutdownHookThread {
   println(Gracefully stopping Spark Streaming Application)
   ssc.stop(stopSparkContext = true, stopGracefully = true)
   println(Application stopped)
 }

 ssc.start()
 ssc.awaitTermination()
   }
 }


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
The data coming from dstream have the same keys that are in myRDD, so
the reduceByKey
after union keeps the overall tuple count in myRDD fixed. Or even with
fixed tuple count, it will keep consuming more resources?

On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand






query on Spark + Flume integration using push model

2015-07-09 Thread diplomatic Guru
Hello all,

I'm trying to configure the flume to push data into a sink so that my
stream job could pick up the data. My events are in JSON format, but the
Spark + Flume integration [1] document only refer to Avro sink.

[1] https://spark.apache.org/docs/latest/streaming-flume-integration.html

I looked at some of the examples online, and they all refer to avro type:

agent.sinks.avroSink.type = avro

If I set the type to avro and send the data in JSON, will it work? I'm
unable to try this because the Stream job throwing Avro
'org.apache.flume.source.avro.AvroFlumeEvent' exception.


Please advice how to handle this situation.


many thanks


Some BlockManager Doubts

2015-07-09 Thread Dibyendu Bhattacharya
Hi ,

Just would like to clarify few doubts I have how BlockManager behaves .
This is mostly in regards to Spark Streaming Context .

There are two possible cases Blocks may get dropped / not stored in memory

Case 1. While writing the Block for MEMORY_ONLY settings , if Node's
BlockManager does not have enough memory to unroll the block , Block wont
be stored to memory and Receiver will throw error while writing the Block..
If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will
be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This
is fine in the case while receiving the blocks , but this logic has a issue
when old Blocks are chosen to be dropped from memory as Case 2

Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings ,
blocks are successfully stored to Memory in Case 1 . Now what would happen
if memory limit goes beyond a certain threshold, BlockManager start
dropping LRU blocks from memory which was successfully stored while
receiving.

Primary issue here what I see , while dropping the blocks in Case 2 , Spark
does not check if storage level is using Disk (MEMORY_AND_DISK ) , and even
with DISK storage levels  blocks is drooped from memory without writing it
to Disk.
Or I believe the issue is at the first place that blocks are NOT written to
Disk simultaneously in Case 1 , I understand this will impact throughput ,
but it design may throw BlockNotFound error if Blocks are chosen to be
dropped even in case of StorageLevel is using Disk.

Any thoughts ?

Regards,
Dibyendu


S3 vs HDFS

2015-07-09 Thread Brandon White
Are there any significant performance differences between reading text
files from S3 and hdfs?


spark ec2 as non-root / any plan to improve that in the future ?

2015-07-09 Thread matd
Hi,

Spark ec2 scripts are useful, but they install everything as root. 
AFAIK, it's not a good practice ;-)

Why is it so ?
Should these scripts reserved for test/demo purposes, and not to be used for
a production system ?
Is it planned in some roadmap to improve that, or to replace ec2-scripts
with something else ?

Would it be difficult to change them to use a sudo-er instead ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-as-non-root-any-plan-to-improve-that-in-the-future-tp23734.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Job completed successfully without processing anything

2015-07-09 Thread Akhil Das
Looks like a configuration problem with your spark setup, are you running
the driver on a different network? Can you try a simple program from
spark-shell and make sure your setup is proper? (like sc.parallelize(1 to
1000).collect())

Thanks
Best Regards

On Thu, Jul 9, 2015 at 1:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 My job completed in 40 seconds that is not correct as there is no output..

 I seee
 Exception in thread main akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Anchor(akka.tcp://sparkDriver@10.115.86.24:54737/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 15/07/08 12:27:31 INFO storage.DiskBlockManager: Shutdown hook called
 15/07/08 12:27:31 INFO util.Utils: Shutdown hook called


 --
 Deepak




spark streaming performance

2015-07-09 Thread Michel Hubert

Hi,

I've developed a POC Spark Streaming application.
But it seems to perform better on my development machine  than on our cluster.
I submit it to yarn on our cloudera cluster.

But my first question is more detailed:

In de application UI (:4040) I see in the streaming section that the batch 
processing took 6 sec.
Then when I look at the stages I indeed see a stage with duration 5s.

For example:
1678

map at LogonAnalysis.scala:215+details

2015/07/09 09:17:00

5 s

50/50

173.5 KB


But when I look into the details of state 1678 it tells me the duration was 14 
ms and the aggregated metrics by executor has 1.0s as Task Time.
What is responsible for the gap between 14 ms, 1s and 5 sec?


Details for Stage 1678
* Total task time across all tasks: 0.8 s
* Shuffle write: 173.5 KB / 2031
 Show additional metrics
Summary Metrics for 50 Completed Tasks
Metric

Min

25th percentile

Median

75th percentile

Max

Duration

14 ms

14 ms

15 ms

15 ms

24 ms

GC Time

0 ms

0 ms

0 ms

0 ms

0 ms

Shuffle Write Size / Records

2.6 KB / 28

3.1 KB / 35

3.5 KB / 42

3.9 KB / 46

4.4 KB / 53

Aggregated Metrics by Executor
Executor ID

Address

Task Time

Total Tasks

Failed Tasks

Succeeded Tasks

Shuffle Write Size / Records

2

:44231

1.0 s

50

0

50

173.5 KB / 2031








Re: Connecting to nodes on cluster

2015-07-09 Thread Akhil Das
On Wed, Jul 8, 2015 at 7:31 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hi,

 We have a cluster with 4 nodes. The cluster uses CDH 5.4 for the past two
 days I have been trying to connect my laptop to the server using spark
 master ip:port but its been unsucessful.
 The server contains data that needs to be cleaned and analysed.
 The cluster and the nodes are on linux environment.
 To connect to the nodes I am usnig SSH

 Question: Would it be better if I work directly on the nodes rather than
 trying to connect my laptop to them ?


​- You will be able to connect to master machine in the cloud from your
laptop​

​, but you need to make sure that the master is able to connect back to
your laptop (may require port forwarding on your router, firewalls etc.)
 ​
​

 Question 2: If yes, then can you suggest any python and R IDE that I can
 install on the nodes to make it work?


​- Once the master machine is able to connect to your laptop's public ip,
then you can set the spark.driver.host and spark.driver.port properties and
your job will get executed on the cluster.
​



 Thanks for your help


 Sincerely,
 Ashish Dutt




Re: Is there a way to shutdown the derby in hive context in spark shell?

2015-07-09 Thread Akhil Das
Did you try sc.shutdown and creating a new one?

Thanks
Best Regards

On Wed, Jul 8, 2015 at 8:12 PM, Terry Hole hujie.ea...@gmail.com wrote:

 I am using spark 1.4.1rc1 with default hive settings

 Thanks
 - Terry

 Hi All,

 I'd like to use the hive context in spark shell, i need to recreate the
 hive meta database in the same location, so i want to close the derby
 connection previous created in the spark shell, is there any way to do this?

 I try this, but it does not work:

 DriverManager.getConnection(jdbc:derby:;shutdown=true);

 Thanks!

 - Terry





Re: What does RDD lineage refer to ?

2015-07-09 Thread Akhil Das
Yes, just to add see the following scenario of rdd lineage:

RDD1 - RDD2 - RDD3 - RDD4


here RDD2 depends on the RDD1's output and the lineage goes till RDD4.

Now, for some reason RDD3 is lost, and spark will recompute it from RDD2.

Thanks
Best Regards

On Thu, Jul 9, 2015 at 5:51 AM, canan chen ccn...@gmail.com wrote:

 Lots of places refer RDD lineage, I'd like to know what it refer to
 exactly.  My understanding is that it means the RDD dependencies and the
 intermediate MapOutput info in MapOutputTracker.  Correct me if I am wrong.
 Thanks





Re: Re: how to use DoubleRDDFunctions on mllib Vector?

2015-07-09 Thread 诺铁
Ok, got it , thanks.

On Thu, Jul 9, 2015 at 12:02 PM, prosp4300 prosp4...@163.com wrote:



 Seems what Feynman mentioned is the source code instead of documentation,
 vectorMean is private, see

 https://github.com/apache/spark/blob/v1.3.0/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala

 At 2015-07-09 10:10:58, 诺铁 noty...@gmail.com wrote:

 thanks, I understand now.
 but I can't find mllib.clustering.GaussianMixture#vectorMean   , what
 version of spark do you use?

 On Thu, Jul 9, 2015 at 1:16 AM, Feynman Liang fli...@databricks.com
 wrote:

 A RDD[Double] is an abstraction for a large collection of doubles,
 possibly distributed across multiple nodes. The DoubleRDDFunctions are
 there for performing mean and variance calculations across this distributed
 dataset.

 In contrast, a Vector is not distributed and fits on your local machine.
 You would be better off computing these quantities on the Vector directly
 (see mllib.clustering.GaussianMixture#vectorMean for an example of how to
 compute the mean of a vector).

 On Tue, Jul 7, 2015 at 8:26 PM, 诺铁 noty...@gmail.com wrote:

 hi,

 there are some useful functions in DoubleRDDFunctions, which I can use
 if I have RDD[Double], eg, mean, variance.

 Vector doesn't have such methods, how can I convert Vector to
 RDD[Double], or maybe better if I can call mean directly on a Vector?








Re: Is there a way to shutdown the derby in hive context in spark shell?

2015-07-09 Thread Terry Hole
Hi, Akhil,

I have tried, it does not work.

This may be related to the new added isolated classloader in spark hive
context, the error call stack is:
java.sql.SQLException: Failed to start database 'metastore_db' with class
loader
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@257edcaa, see
the next exception for details.
at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
Source)
at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.init(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection40.init(Unknown Source)
at org.apache.derby.jdbc.Driver40.getNewEmbedConnection(Unknown Source)
at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
at org.apache.derby.jdbc.Driver20.connect(Unknown Source)
at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at
org.datanucleus.store.rdbms.datasource.dbcp.DriverManagerConnectionFactory.createConnection(DriverManagerConnectionFactory.java:78)
at
org.datanucleus.store.rdbms.datasource.dbcp.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:582)
at
org.datanucleus.store.rdbms.datasource.dbcp.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1158)
at
org.datanucleus.store.rdbms.datasource.dbcp.PoolingDataSource.getConnection(PoolingDataSource.java:108)
at
org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:501)
at
org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:298)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at
org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
at
org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187)
at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339)
at
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248)
at
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58)
at
org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:497)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:475)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:523)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:397)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:356)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
at
org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4944)
at

Re: spark streaming performance

2015-07-09 Thread Tathagata Das
What were the number of cores in the executor? It could be that you had
only one core in the executor which did all the 50 tasks serially so 50
task X 15 ms = ~ 1 second.
Could you take a look at the task details in the stage page to see when the
tasks were added to see whether it explains the 5 second?

On Thu, Jul 9, 2015 at 12:21 AM, Michel Hubert mich...@vsnsystemen.nl
wrote:



 Hi,



 I’ve developed a POC Spark Streaming application.

 But it seems to perform better on my development machine  than on our
 cluster.

 I submit it to yarn on our cloudera cluster.



 But my first question is more detailed:



 In de application UI (:4040) I see in the streaming section that the batch
 processing took 6 sec.

 Then when I look at the stages I indeed see a stage with duration 5s.



 For example:

 1678

 map at LogonAnalysis.scala:215+details

 2015/07/09 09:17:00

 5 s

 50/50

 173.5 KB



 But when I look into the details of state 1678 it tells me the duration
 was 14 ms and the aggregated metrics by executor has 1.0s as Task Time.

 What is responsible for the gap between 14 ms, 1s and 5 sec?





 *Details for Stage 1678*

 · *Total task time across all tasks: *0.8 s

 · *Shuffle write: *173.5 KB / 2031

  *Show additional metrics*

 *Summary Metrics for 50 Completed Tasks*

 *Metric*

 *Min*

 *25th percentile*

 *Median*

 *75th percentile*

 *Max*

 Duration

 14 ms

 14 ms

 15 ms

 15 ms

 24 ms

 GC Time

 0 ms

 0 ms

 0 ms

 0 ms

 0 ms

 Shuffle Write Size / Records

 2.6 KB / 28

 3.1 KB / 35

 3.5 KB / 42

 3.9 KB / 46

 4.4 KB / 53

 *Aggregated Metrics by Executor*

 *Executor ID*

 *Address*

 *Task Time*

 *Total Tasks*

 *Failed Tasks*

 *Succeeded Tasks*

 *Shuffle Write Size / Records*

 2

 :44231

 1.0 s

 50

 0

 50

 173.5 KB / 2031













Re: S3 vs HDFS

2015-07-09 Thread Sujee Maniyam
latency is much bigger for S3 (if that matters)
And with HDFS you'd get data-locality that will boost your app performance.

I did some light experimenting on this.
see my presentation here for some benchmark numbers ..etc
http://www.slideshare.net/sujee/hadoop-to-sparkv2
from slide# 34

cheers
Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam )
teaching Spark
http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature


On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com
wrote:

 Are there any significant performance differences between reading text
 files from S3 and hdfs?



Questions about Fault tolerance of Spark

2015-07-09 Thread 牛兆捷
Hi All:

We already know that Spark utilizes the lineage to recompute the RDDs when
failure occurs.
I want to study the performance of this fault-tolerant approach and have
some questions about it.

1) Is there any benchmark (or standard failure model) to test the fault
tolerance of these kinds of in-memory data processing systems?

2) How do you emulate the failures in testing spark?  (e.g., kill a
computation task? or kill the computation nodes?)

Thanks!!!

-- 
*Regards,*
*Zhaojie*


Scheduler delay vs. Getting result time

2015-07-09 Thread Hans van den Bogert
Hi, 

In the Spark UI, under “Show additional metrics”, there are two extra metrics 
you can show 
.1 Scheduler delay
.2 and Getting result time

When hovering “Scheduler Delay it says (among other things):
…time to send task result from executor…

When hovering “Getting result time”:
Time that the driver spends fetching task results from workers.

What are the differences between the two?

In my case I’m benchmarking with some sleep commands and returning some big 
arrays, per task, to emulate execution time and network communication 
respectively. I can’t see any “Getting Result Time” increases, they are simple 
0ms. I’m using a ‘collect’ command and can see the synthetic result arrays when 
I use a spark-shell.

Regards,

Hans



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Data Processing speed SQL Vs SPARK

2015-07-09 Thread vinod kumar
Hi Everyone,

I am new to spark.

Am using SQL in my application to handle data in my application.I have a
thought to move to spark now.

Is data processing speed of spark better than SQL server?

Thank,
Vinod


Re: databases currently supported by Spark SQL JDBC

2015-07-09 Thread ayan guha
I suppose every RDBMS has a jdbc driver to connct to. I know Oracle, MySQL,
SQL Server, Terdata, Netezza have.

On Thu, Jul 9, 2015 at 10:09 PM, Niranda Perera niranda.per...@gmail.com
wrote:

 Hi,

 I'm planning to use Spark SQL JDBC datasource provider in various RDBMS
 databases.

 what are the databases currently supported by Spark JDBC relation provider?

 rgds

 --
 Niranda
 @n1r44 https://twitter.com/N1R44
 https://pythagoreanscript.wordpress.com/




-- 
Best Regards,
Ayan Guha


Spark Streaming Hangs on Start

2015-07-09 Thread Bin Wang
I'm using spark streaming with Kafka, and submit it to YARN cluster with
mode yarn-cluster. But it hangs at SparkContext.start(). The Kafka config
is right since it can show some events in Streaming tab of web UI.

The attached file is the screen shot of the Jobs tab of web UI. The code
in the main class is:

object StatCounter {

  val config = ConfigFactory.load()
  val redisUrl = config.getString(redis.url)
  val redisPort = config.getInt(redis.port)
  val zkQuorum = config.getString(kafka.zkQuorum)
  val group = config.getString(kafka.group)
  val topic = config.getString(kafka.topic)
  val threadNum = config.getInt(kafka.threadNum)

  val cache = new RedisCache(redisUrl, redisPort)

  def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(config.getString(spark.name))
.set(spark.cassandra.connection.host,
config.getString(cassandra.host))

val ssc = new StreamingContext(conf,
Seconds(config.getInt(spark.interval)))
ssc.checkpoint(config.getString(spark.checkpoint))
val storage = new CassandraStorage(adhoc_data, ssc)

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
threadNum)).map(_._2)

val logs = lines.flatMap(line = Parser.parseBody(line, cache))
Counter.count(logs, storage)

sys.ShutdownHookThread {
  println(Gracefully stopping Spark Streaming Application)
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  println(Application stopped)
}

ssc.start()
ssc.awaitTermination()
  }
}

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Hi,

I've an application in which an rdd is being updated with tuples coming
from RDDs in a DStream with following pattern.

dstream.foreachRDD(rdd = {
  myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
})

I'm using cache() and checkpointin to cache results. Over the time, the
lineage of myRDD keeps increasing and stages in each batch of dstream keeps
increasing, even though all the earlier stages are skipped. When the number
of stages grow big enough, the overall delay due to scheduling delay starts
increasing. The processing time for each batch is still fixed.

Following figures illustrate the problem:

Job execution: https://i.imgur.com/GVHeXH3.png?1


Delays: https://i.imgur.com/1DZHydw.png?1


Is there some pattern that I can use to avoid this?

Regards,
Anand


RE: SparkR dataFrame read.df fails to read from aws s3

2015-07-09 Thread Sun, Rui
Hi, Ben


1)  I guess this may be a JDK version mismatch. Could you check the JDK 
version?

2)  I believe this is a bug in SparkR. I will fire a JIRA issue for it.

From: Ben Spark [mailto:ben_spar...@yahoo.com.au]
Sent: Thursday, July 9, 2015 12:14 PM
To: user
Subject: SparkR dataFrame read.df fails to read from aws s3

I have Spark 1.4 deployed on AWS EMR but methods of SparkR dataFrame read.df 
method cannot load data from aws s3.

1) read.df error message

 read.df(sqlContext,s3://some-bucket/some.json,json)

15/07/09 04:07:01 ERROR r.RBackendHandler: loadDF on 
org.apache.spark.sql.api.r.SQLUtils failed

java.lang.IllegalArgumentException: invalid method loadDF for object 
org.apache.spark.sql.api.r.SQLUtils

 at 
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:143)

 at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74)

 at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
2) jsonFile is working though with some warning message

Warning message:

In normalizePath(path) :

  path[1]=s3://rea-consumer-data-dev/cbr/profiler/output/20150618/part-0: 
No such file or directory


RE: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Michel Hubert
Hi,

I was just wondering how you generated to second image with the charts.
What product?

From: Anand Nalya [mailto:anand.na...@gmail.com]
Sent: donderdag 9 juli 2015 11:48
To: spark users
Subject: Breaking lineage and reducing stages in Spark Streaming

Hi,

I've an application in which an rdd is being updated with tuples coming from 
RDDs in a DStream with following pattern.

dstream.foreachRDD(rdd = {
  myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
})

I'm using cache() and checkpointin to cache results. Over the time, the lineage 
of myRDD keeps increasing and stages in each batch of dstream keeps increasing, 
even though all the earlier stages are skipped. When the number of stages grow 
big enough, the overall delay due to scheduling delay starts increasing. The 
processing time for each batch is still fixed.

Following figures illustrate the problem:

Job execution: https://i.imgur.com/GVHeXH3.png?1
[Image removed by sender.]
Delays: https://i.imgur.com/1DZHydw.png?1
[Image removed by sender.]
Is there some pattern that I can use to avoid this?

Regards,
Anand


Spark Mesos task rescheduling

2015-07-09 Thread besil
Hi,

We are experimenting scheduling errors due to mesos slave failing.
It seems to be an open bug, more information can be found here.

https://issues.apache.org/jira/browse/SPARK-3289

According to this  link
https://mail-archives.apache.org/mod_mbox/mesos-user/201310.mbox/%3ccaakwvaxprrnrcdlazcybnmk1_9elyheodaf8urf8ssrlbac...@mail.gmail.com%3E
  
from mail archive, it seems that Spark doesn't reschedule LOST tasks to
active executors, but keep trying rescheduling it on the failed host.

We would like to dynamically resize our Mesos cluster (adding or removing
machines - using an AWS autoscaling group), but this bug kills our running
applications if a Mesos slave running a Spark executor is shut down.

Is any known workaround?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-task-rescheduling-tp23740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Thats from the Streaming tab for Spark 1.4 WebUI.

On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand



Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Dean Wampler
Is myRDD outside a DStream? If so are you persisting on each batch
iteration? It should be checkpointed frequently too.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote:

 The data coming from dstream have the same keys that are in myRDD, so the 
 reduceByKey
 after union keeps the overall tuple count in myRDD fixed. Or even with
 fixed tuple count, it will keep consuming more resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand







Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Yes, myRDD is outside of DStream. Following is the actual code where newBase
and current are the rdds being updated with each batch:

  val base = sc.textFile...
  var newBase = base.cache()

  val dstream: DStream[String] = ssc.textFileStream...
  var current: RDD[(String, Long)] = sc.emptyRDD.cache()

  dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = {

current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

val joined = current.leftOuterJoin(newBase).cache()
val toUpdate = joined.filter(myfilter).map(mymap).cache()
val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

toUpdate.collect().foreach(println) // this goes to some store

newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
2).cache()

current = toNotUpdate.cache()

toUpdate.unpersist()
joined.unpersist()
rdd.unpersist()
  })


Regards,

Anand


On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote:

 Is myRDD outside a DStream? If so are you persisting on each batch
 iteration? It should be checkpointed frequently too.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote:

 The data coming from dstream have the same keys that are in myRDD, so the 
 reduceByKey
 after union keeps the overall tuple count in myRDD fixed. Or even with
 fixed tuple count, it will keep consuming more resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples
 coming from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time,
 the lineage of myRDD keeps increasing and stages in each batch of dstream
 keeps increasing, even though all the earlier stages are skipped. When the
 number of stages grow big enough, the overall delay due to scheduling 
 delay
 starts increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand








Re: Spark 1.4.0 - Using SparkR on EC2 Instance

2015-07-09 Thread RedOakMark
That’s correct.  We were setting up a Spark EC2 cluster from the command line, 
then installing RStudio Server, logging into that through the web interface and 
attempting to initialize the cluster within RStudio.  

We have made some progress on this outside of the thread - I will see what I 
can compile and share as a potential walkthrough. 



 On Jul 8, 2015, at 9:25 PM, BenPorter [via Apache Spark User List] 
 ml-node+s1001560n23732...@n3.nabble.com wrote:
 
 RedOakMark - just to make sure I understand what you did.  You ran the EC2 
 script on a local machine to spin up the cluster, but then did not try to run 
 anything in R/RStudio from your local machine.  Instead you installed RStudio 
 on the driver and ran it as a local cluster from that driver.  Is that 
 correct?  Otherwise, you make no reference to the master/EC2 server in this 
 code, so I have to assume that means you were running this directly from the 
 master. 
 
 Thanks, 
 Ben 
 
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23732.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23732.html
 To unsubscribe from Spark 1.4.0 - Using SparkR on EC2 Instance, click here 
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=23506code=bWFya0ByZWRvYWtzdHJhdGVnaWMuY29tfDIzNTA2fDE0OTQ4NTQ4ODQ=.
 NAML 
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23742.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: S3 vs HDFS

2015-07-09 Thread Daniel Darabos
I recommend testing it for yourself. Even if you have no application, you
can just run the spark-ec2 script, log in, run spark-shell and try reading
files from an S3 bucket and from hdfs://master IP:9000/. (This is the
ephemeral HDFS cluster, which uses SSD.)

I just tested our application this way yesterday and found the SSD-based
HDFS to outperform S3 by a factor of 2. I don't know the cause. It may be
locality like Akhil suggests, or SSD vs HDD (assuming S3 is HDD-backed). Or
the HDFS client library and protocol are just better than the S3 versions
(which is HTTP-based and uses some 6-year-old libraries).

On Thu, Jul 9, 2015 at 9:54 AM, Sujee Maniyam su...@sujee.net wrote:

 latency is much bigger for S3 (if that matters)
 And with HDFS you'd get data-locality that will boost your app performance.

 I did some light experimenting on this.
 see my presentation here for some benchmark numbers ..etc
 http://www.slideshare.net/sujee/hadoop-to-sparkv2
 from slide# 34

 cheers
 Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam
 )
 teaching Spark
 http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature


 On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Are there any significant performance differences between reading text
 files from S3 and hdfs?





Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Dean Wampler
I think you're complicating the cache behavior by aggressively re-using
vars when temporary vals would be more straightforward. For example,
newBase = newBase.unpersist()... effectively means that newBase's data is
not actually cached when the subsequent .union(...) is performed, so it
probably goes back to the lineage... Same with the current.unpersist logic
before it.

Names are cheap, so just use local vals:

val newCurrent = rdd.union(current).reduceByKey(_+_)
current.unpersist()

Also, what happens if you omit the 2 argument for the number of
partitions in reduceByKey?

Other minor points:

I would change the joined, toUpdate, toNotUpdate logic to this:

val joined = current.leftOuterJoin(newBase).map(mymap).cache()

val toUpdate = joined.filter(myfilter).cache()
val toNotUpdate = joined.filter(mynotfilter).cache()


Maybe it's just for this email example, but you don't need to call collect
on toUpdate before using foreach(println). If the RDD is huge, you
definitely don't want to do that.

Hope this helps.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote:

 Yes, myRDD is outside of DStream. Following is the actual code where newBase
 and current are the rdds being updated with each batch:

   val base = sc.textFile...
   var newBase = base.cache()

   val dstream: DStream[String] = ssc.textFileStream...
   var current: RDD[(String, Long)] = sc.emptyRDD.cache()

   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = {

 current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

 val joined = current.leftOuterJoin(newBase).cache()
 val toUpdate = joined.filter(myfilter).map(mymap).cache()
 val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

 toUpdate.collect().foreach(println) // this goes to some store

 newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
 2).cache()

 current = toNotUpdate.cache()

 toUpdate.unpersist()
 joined.unpersist()
 rdd.unpersist()
   })


 Regards,

 Anand


 On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote:

 Is myRDD outside a DStream? If so are you persisting on each batch
 iteration? It should be checkpointed frequently too.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 The data coming from dstream have the same keys that are in myRDD, so
 the reduceByKey after union keeps the overall tuple count in myRDD
 fixed. Or even with fixed tuple count, it will keep consuming more
 resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the
 charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples
 coming from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time,
 the lineage of myRDD keeps increasing and stages in each batch of dstream
 keeps increasing, even though all the earlier stages are skipped. When 
 the
 number of stages grow big enough, the overall delay due to scheduling 
 delay
 starts increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand









Re: DLL load failed: %1 is not a valid win32 application on invoking pyspark

2015-07-09 Thread ashishdutt
Not really a clean solution but I solved the problem by reinstalling Anaconda 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DLL-load-failed-1-is-not-a-valid-win32-application-on-invoking-pyspark-tp23733p23743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Data Processing speed SQL Vs SPARK

2015-07-09 Thread ayan guha
It depends on workload. How much data you would want to process?
On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote:

 Hi Everyone,

 I am new to spark.

 Am using SQL in my application to handle data in my application.I have a
 thought to move to spark now.

 Is data processing speed of spark better than SQL server?

 Thank,
 Vinod



SPARK_WORKER_DIR and SPARK_LOCAL_DIR

2015-07-09 Thread corrius
Hello,

I have a 4 nodes spark cluster running on EC2 and it's running out of space
in disk. I'm running Spark 1.3.1.

I have mounted a second SSD disk in every instance on /tmp/spark and set
SPARK_LOCAL_DIRS and SPARK_WORKER_DIRS pointing to this folder:

set | grep SPARK
SPARK_LOCAL_DIRS=/tmp/spark
SPARK_WORKER_DIR=/tmp/spark

Once I start my cluster I can see that the Master get these variables and
put everything in /tmp/spark but the workers are still using /tmp/ to spill
data to the disk, what ends up filling the disk.

I also tried starting the workers with -d /tmp/spark and this only moves a
small file (app-...) from /opt/spark/work to my temp folder.

The folders and files I can still find in /tmp/ looks like:
spark-39fa5e41-3ce4-40e9-b2a7-8f3739db604e

I don't know if I am missing something, any help would be much appreciated.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-WORKER-DIR-and-SPARK-LOCAL-DIR-tp23754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pyspark not working on yarn-cluster mode

2015-07-09 Thread jegordon
Hi to all,

Is there any way to run pyspark scripts with yarn-cluster mode without using
the spark-submit script? I need it in this way because i will integrate this
code into a django web app.

When i try to run any script in yarn-cluster mode i got the following error
:

org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
running on a cluster. Deployment to YARN is not supported directly by
SparkContext. Please use spark-submit.


I'm creating the sparkContext in the following way :

conf = (SparkConf()
.setMaster(yarn-cluster)
.setAppName(DataFrameTest))

sc = SparkContext(conf = conf)

#Dataframe code 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[X-post] Saving SparkSQL result RDD to Cassandra

2015-07-09 Thread Su She
Hello All,

I also posted this on the Spark/Datastax thread, but thought it was also
50% a spark question (or mostly a spark question).

I was wondering what is the best practice to saving streaming Spark SQL (
https://github.com/Intel-bigdata/spark-streamingsql/blob/master/src/main/scala/org/apache/spark/sql/streaming/examples/KafkaDDL.scala)
results to Cassandra?

The query looks like this:

 streamSqlContext.sql(
  
|SELECT t.word, COUNT(t.word)
|FROM (SELECT * FROM t_kafka) OVER (WINDOW '9' SECONDS, SLIDE '3'
SECONDS) AS t
|GROUP BY t.word
  .stripMargin)
  .foreachRDD { r = r.toString()}.map(x =
x.split(,)).map(x=data(x(0),x(1))).saveToCassandra(demo, sqltest)

I’m getting a message saying map isn’t a member of Unit.

I thought since I'm converting it to a string I can call a map/save to
Cassandra function there, but it seems like I can't call map after
r.toString()?

Please let me know if this is possible and what is the best way of doing
this. Thank you for the help!

-Su


Re: Spark serialization in closure

2015-07-09 Thread Andrew Or
Hi Chen,

I believe the issue is that `object foo` is a member of `object testing`,
so the only way to access `object foo` is to first pull `object testing`
into the closure, then access a pointer to get to `object foo`. There are
two workarounds that I'm aware of:

(1) Move `object foo` outside of `object testing`. This is only a problem
because of the nested objects. Also, by design it's simpler to reason about
but that's a separate discussion.

(2) Create a local variable for `foo.v`. If all your closure cares about is
the integer, then it makes sense to add a `val v = foo.v` inside `func` and
use this in your closure instead. This avoids pulling in $outer pointers
into your closure at all since it only references local variables.

As others have commented, I think this is more of a Scala problem than a
Spark one.

Let me know if these work,
-Andrew

2015-07-09 13:36 GMT-07:00 Richard Marscher rmarsc...@localytics.com:

 Reading that article and applying it to your observations of what happens
 at runtime:

 shouldn't the closure require serializing testing? The foo singleton
 object is a member of testing, and then you call this foo value in the
 closure func and further in the foreachPartition closure. So following by
 that article, Scala will attempt to serialize the containing object/class
 testing to get the foo instance.

 On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote:

 Repost the code example,

 object testing extends Serializable {
 object foo {
   val v = 42
 }
 val list = List(1,2,3)
 val rdd = sc.parallelize(list)
 def func = {
   val after = rdd.foreachPartition {
 it = println(foo.v)
   }
 }
   }

 On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Erik. I saw the document too. That is why I am confused because
 as per the article, it should be good as long as *foo *is serializable.
 However, what I have seen is that it would work if *testing* is
 serializable, even foo is not serializable, as shown below. I don't know if
 there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but
 I am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing
 the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




 --
 Chen Song




 --
 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com http://localytics.com/ | Our Blog
 http://localytics.com/blog | Twitter http://twitter.com/localytics |
 Facebook http://facebook.com/localytics | LinkedIn
 http://www.linkedin.com/company/1148792?trk=tyah



RE: Feature Generation On Spark

2015-07-09 Thread Mohammed Guller
Take a look at the examples here:
https://spark.apache.org/docs/latest/ml-guide.html

Mohammed

From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com]
Sent: Saturday, July 4, 2015 10:49 PM
To: ayan guha; Michal Čizmazia
Cc: user
Subject: RE: Feature Generation On Spark

I have one document per file and each file is to be converted to a feature 
vector. Pretty much like standard feature construction for document 
classification.

Thanks
Rishi

Date: Sun, 5 Jul 2015 01:44:04 +1000
Subject: Re: Feature Generation On Spark
From: guha.a...@gmail.commailto:guha.a...@gmail.com
To: mici...@gmail.commailto:mici...@gmail.com
CC: rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com; 
user@spark.apache.orgmailto:user@spark.apache.org
Do you have one document per file or multiple document in the file?
On 4 Jul 2015 23:38, Michal Čizmazia 
mici...@gmail.commailto:mici...@gmail.com wrote:
Spark Context has a method wholeTextFiles. Is that what you need?

On 4 July 2015 at 07:04, rishikesh 
rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com wrote:
 Hi

 I am new to Spark and am working on document classification. Before model
 fitting I need to do feature generation. Each document is to be converted to
 a feature vector. However I am not sure how to do that. While testing
 locally I have a static list of tokens and when I parse a file I do a lookup
 and increment counters.

 In the case of Spark I can create an RDD which loads all the documents
 however I am not sure if one files goes to one executor or multiple. If the
 file is split then the feature vectors needs to be merged. But I am not able
 to figure out how to do that.

 Thanks
 Rishi



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 
 user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 
 user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Does spark guarantee that the same task will process the same key over time?

2015-07-09 Thread micvog
For example in the simplest word count example, I want to update the count in
memory and always have the same word getting updated by the same task - not
use any distributed memstore.

I know that updateStateByKey should guarantee that, but how do you approach
this problem outside of spark streaming?

Thanks,
Michael



-
Michael Vogiatzis
@mvogiatzis 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-guarantee-that-the-same-task-will-process-the-same-key-over-time-tp23753.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark serialization in closure

2015-07-09 Thread Chen Song
Thanks Erik. I saw the document too. That is why I am confused because as
per the article, it should be good as long as *foo *is serializable.
However, what I have seen is that it would work if *testing* is
serializable, even foo is not serializable, as shown below. I don't know if
there is something specific to Spark.

For example, the code example below works.

object testing extends Serializable {

object foo {

  val v = 42

}

val list = List(1,2,3)

val rdd = sc.parallelize(list)

def func = {

  val after = rdd.foreachPartition {

it = println(foo.v)

  }

}

  }

On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but I am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




-- 
Chen Song


Re: Spark serialization in closure

2015-07-09 Thread Chen Song
Repost the code example,

object testing extends Serializable {
object foo {
  val v = 42
}
val list = List(1,2,3)
val rdd = sc.parallelize(list)
def func = {
  val after = rdd.foreachPartition {
it = println(foo.v)
  }
}
  }

On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Erik. I saw the document too. That is why I am confused because as
 per the article, it should be good as long as *foo *is serializable.
 However, what I have seen is that it would work if *testing* is
 serializable, even foo is not serializable, as shown below. I don't know if
 there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but I
 am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




-- 
Chen Song


Re: Spark serialization in closure

2015-07-09 Thread Richard Marscher
Reading that article and applying it to your observations of what happens
at runtime:

shouldn't the closure require serializing testing? The foo singleton object
is a member of testing, and then you call this foo value in the closure
func and further in the foreachPartition closure. So following by that
article, Scala will attempt to serialize the containing object/class
testing to get the foo instance.

On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote:

 Repost the code example,

 object testing extends Serializable {
 object foo {
   val v = 42
 }
 val list = List(1,2,3)
 val rdd = sc.parallelize(list)
 def func = {
   val after = rdd.foreachPartition {
 it = println(foo.v)
   }
 }
   }

 On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Erik. I saw the document too. That is why I am confused because as
 per the article, it should be good as long as *foo *is serializable.
 However, what I have seen is that it would work if *testing* is
 serializable, even foo is not serializable, as shown below. I don't know if
 there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but I
 am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing
 the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




 --
 Chen Song




-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Friend recommendation using collaborative filtering?

2015-07-09 Thread Diogo B.
Dear list,

I have some questions regarding collaborative filtering. Although they are not 
specific to Spark, I hope the folks in this community might be able to help me 
somehow.

We are looking for a simple way how to recommend users to other users, i.e., 
how to recommend new friends.

Do you have any experience in using collaborative filtering 
(MatrixFactorization) to recommend users instead of products?
Are there any caveats we should be aware of or can we directly apply the method?

We considered using the similarity of users (based on the sets of common 
friends) to suggest new friends, but (1) iterating over the whole set of users 
sounded inefficient and (2) we are not sure the intersections between the 
friend-sets is sufficiently large/diverse to yield a personalized friendship 
recommendation.
Would MatrixFactorization be more efficient? Would it yield somehow better 
results due to the latent factors? Any experiences on that?

Finally, our users are connected with binary values (like or dislike). Is such 
information sufficient to feed into the algorithm, or does the algorithm 
require a score from 1 to N for explicit feedback or a  number of occurrences, 
visits, messages exchanged, etc for implicit feedback?

I would be very grateful about any pointers.

Cheers,
Diogo

PS: I know there are many many papers on these topics, but I am first trying to 
collect evidence that this is the right direction for us.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Tathagata Das
Summarizing the main problems discussed by Dean

1. If you have an infinitely growing lineage, bad things will eventually
happen. You HAVE TO periodically (say every 10th batch), checkpoint the
information.

2. Unpersist the previous `current` RDD ONLY AFTER running an action on the
`newCurrent`. Otherwise you are throwing current out of the cache before
newCurrent has been computed. Modifying Dean's example.

val newCurrent = rdd.union(current).reduceByKey(_+_)
...
// join with newCurrent
// collect or count or any action that uses newCurrent
//

// Now you can unpersist because the newCurrent has been persisted and wont
require falling back to this cached current RDD.
current.unpersist()


On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler deanwamp...@gmail.com wrote:

 I think you're complicating the cache behavior by aggressively re-using
 vars when temporary vals would be more straightforward. For example,
 newBase = newBase.unpersist()... effectively means that newBase's data is
 not actually cached when the subsequent .union(...) is performed, so it
 probably goes back to the lineage... Same with the current.unpersist logic
 before it.

 Names are cheap, so just use local vals:

 val newCurrent = rdd.union(current).reduceByKey(_+_)
 current.unpersist()

 Also, what happens if you omit the 2 argument for the number of
 partitions in reduceByKey?

 Other minor points:

 I would change the joined, toUpdate, toNotUpdate logic to this:

 val joined = current.leftOuterJoin(newBase).map(mymap).cache()

 val toUpdate = joined.filter(myfilter).cache()
 val toNotUpdate = joined.filter(mynotfilter).cache()


 Maybe it's just for this email example, but you don't need to call collect
 on toUpdate before using foreach(println). If the RDD is huge, you
 definitely don't want to do that.

 Hope this helps.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote:

 Yes, myRDD is outside of DStream. Following is the actual code where newBase
 and current are the rdds being updated with each batch:

   val base = sc.textFile...
   var newBase = base.cache()

   val dstream: DStream[String] = ssc.textFileStream...
   var current: RDD[(String, Long)] = sc.emptyRDD.cache()

   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = {

 current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

 val joined = current.leftOuterJoin(newBase).cache()
 val toUpdate = joined.filter(myfilter).map(mymap).cache()
 val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

 toUpdate.collect().foreach(println) // this goes to some store

 newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
 2).cache()

 current = toNotUpdate.cache()

 toUpdate.unpersist()
 joined.unpersist()
 rdd.unpersist()
   })


 Regards,

 Anand


 On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote:

 Is myRDD outside a DStream? If so are you persisting on each batch
 iteration? It should be checkpointed frequently too.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 The data coming from dstream have the same keys that are in myRDD, so
 the reduceByKey after union keeps the overall tuple count in myRDD
 fixed. Or even with fixed tuple count, it will keep consuming more
 resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl
 wrote:

  Hi,



 I was just wondering how you generated to second image with the
 charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples
 coming from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time,
 the lineage of myRDD keeps increasing and stages in 

Spark serialization in closure

2015-07-09 Thread Chen Song
I am not sure this is more of a question for Spark or just Scala but I am
posting my question here.

The code snippet below shows an example of passing a reference to a closure
in rdd.foreachPartition method.

```
object testing {
object foo extends Serializable {
  val v = 42
}
val list = List(1,2,3)
val rdd = sc.parallelize(list)
def func = {
  val after = rdd.foreachPartition {
it = println(foo.v)
  }
}
  }
```
When running this code, I got an exception

```
Caused by: java.io.NotSerializableException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
Serialization stack:
- object not serializable (class:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
function1)
```

It looks like Spark needs to serialize `testing` object. Why is it
serializing testing even though I only pass foo (another serializable
object) in the closure?

A more general question is, how can I prevent Spark from serializing the
parent class where RDD is defined, with still support of passing in
function defined in other classes?

-- 
Chen Song


Re: Spark serialization in closure

2015-07-09 Thread Erik Erlandson
I think you have stumbled across this idiosyncrasy:

http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




- Original Message -
 I am not sure this is more of a question for Spark or just Scala but I am
 posting my question here.
 
 The code snippet below shows an example of passing a reference to a closure
 in rdd.foreachPartition method.
 
 ```
 object testing {
 object foo extends Serializable {
   val v = 42
 }
 val list = List(1,2,3)
 val rdd = sc.parallelize(list)
 def func = {
   val after = rdd.foreachPartition {
 it = println(foo.v)
   }
 }
   }
 ```
 When running this code, I got an exception
 
 ```
 Caused by: java.io.NotSerializableException:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
 Serialization stack:
 - object not serializable (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
 - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
 name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
 - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
 function1)
 ```
 
 It looks like Spark needs to serialize `testing` object. Why is it
 serializing testing even though I only pass foo (another serializable
 object) in the closure?
 
 A more general question is, how can I prevent Spark from serializing the
 parent class where RDD is defined, with still support of passing in
 function defined in other classes?
 
 --
 Chen Song
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame insertInto fails, saveAsTable works (Azure HDInsight)

2015-07-09 Thread Daniel Haviv
Hi,
I'm running Spark 1.4 on Azure.
DataFrame's insertInto fails, but when saveAsTable works.
It seems like some issue with accessing Azure's blob storage but that
doesn't explain why one type of write works and the other doesn't.

This is the stack trace:

Caused by: org.apache.hadoop.fs.azure.AzureException:
org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key
provider class.

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938)

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438)

at
org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048)

at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at
org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2618)

at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:417)

at
org.apache.hadoop.hive.shims.Hadoop23Shims.getNonCachedFileSystem(Hadoop23Shims.java:574)

at
org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3424)

at
org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3396)

at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:214)

... 59 more

Caused by: org.apache.hadoop.fs.azure.KeyProviderException: Unable to load
key provider class.

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:829)

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:917)

... 70 more

Caused by: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.azure.ShellDecryptionKeyProvider not found

at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)

at
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:826)

... 71 more


Thanks,

Daniel


Re: work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Michal Čizmazia
Spark version 1.4.0 in the Standalone mode

2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
GB)
2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:615)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


On 9 July 2015 at 18:11, Ted Yu yuzhih...@gmail.com wrote:

 Which release of Spark are you using ?

 Can you show the complete stack trace ?

 getBytes() could be called from:
 getBytes(file, 0, file.length)
 or:
 getBytes(segment.file, segment.offset, segment.length)

 Cheers

 On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com wrote:

 Please could anyone give me pointers for appropriate SparkConf to work
 around Size exceeds Integer.MAX_VALUE?

 Stacktrace:

 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
 BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
 GB)
 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
 Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
 at
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
 ...





Re: spark streaming performance

2015-07-09 Thread Tathagata Das
I am not sure why you are getting node_local and not process_local. Also
there is probably not a good documentation other than that configuration
page - http://spark.apache.org/docs/latest/configuration.html (search for
locality)

On Thu, Jul 9, 2015 at 5:51 AM, Michel Hubert mich...@vsnsystemen.nl
wrote:







 StorageLevel.MEMORY_ONLY



 Streaming / Batch Processing Statistics / Processing Time:

 Executor-cores NOT specified: 5s

 Executor-cores 8: 400  ms



 Processing time has significantly reduced with executor-cores set to 8.

 But what is the general rule of thumb for a good excecutor-cores settings?



 But 257 records/sec still isn’t very, or not?

 Statistics over last 95 processed batches Receiver Statistics

 · *Receiver*

 · *Status*

 · *Location*

 · *Records in last batch*

 · *[2015/07/09 14:49:47]*

 · *Minimum rate*

 · *[records/sec]*

 · *Median rate*

 · *[records/sec]*

 · *Maximum rate*

 · *[records/sec]*

 · *Last Error*

 SocketReceiver-0

 ACTIVE

 bfravicsvr81440-cld.opentsp.com

 1352

 0

 257

 279

 -







 “After that the somehow the system is not able to launch any process
 local task”

 Where can I look for an answer why some tasks are NODE_LOCAL and others
 are PROCESS_LOCAL?

 Where should I look for the reason the locality wait expires?


  Details for Stage 147

 · *Total task time across all tasks:* 0.4 s

 · *Shuffle write:* 704.0 B / 26

  *Show additional metrics*
  Summary Metrics for 26 Completed Tasks

 *Metric*

 *Min*

 *25th percentile*

 *Median*

 *75th percentile*

 *Max*

 Duration

 9 ms

 11 ms

 12 ms

 24 ms

 30 ms

 GC Time

 0 ms

 0 ms

 0 ms

 0 ms

 0 ms

 Shuffle Write Size / Records

 27.0 B / 1

 27.0 B / 1

 27.0 B / 1

 27.0 B / 1

 28.0 B / 1
Aggregated Metrics by Executor

 *Executor ID*

 *Address*

 *Task Time*

 *Total Tasks*

 *Failed Tasks*

 *Succeeded Tasks*

 *Shuffle Write Size / Records*

 1

 xxx81440xxx:38882

 0.8 s

 25

 0

 25

 677.0 B / 25

 2

 xxx81441xxx:46832

 45 ms

 1

 0

 1

 27.0 B / 1
Tasks

 *Index*

 *ID*

 *Attempt*

 *Status*

 *Locality Level*

 *Executor ID / Host*

 *Launch Time*

 *Duration*

 *GC Time*

 *Write Time*

 *Shuffle Write Size / Records*

 *Errors*

 0

 1029

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 30 ms

 27.0 B / 1

 2

 1031

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 29 ms

 27.0 B / 1

 1

 1030

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 30 ms

 27.0 B / 1

 5

 1034

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 25 ms

 27.0 B / 1

 4

 1033

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 28 ms

 27.0 B / 1

 3

 1032

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 27 ms

 27.0 B / 1

 25

 1036

 0

 SUCCESS

 PROCESS_LOCAL

 2 / xxx81441xxx

 2015/07/09 14:43:55

 23 ms

 27.0 B / 1

 6

 1035

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 24 ms

 27.0 B / 1

 7

 1037

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 10 ms

 27.0 B / 1

 8

 1038

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 12 ms

 27.0 B / 1

 9

 1039

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 11 ms

 27.0 B / 1

 10

 1040

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 11 ms

 27.0 B / 1

 11

 1041

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx

 2015/07/09 14:43:55

 11 ms

 27.0 B / 1

 12

 1042

 0

 SUCCESS

 NODE_LOCAL

 1 / xxx81440xxx
  ...

 [Message clipped]


Number of Threads in Executor to process Tasks

2015-07-09 Thread Aniruddh Sharma
Hi

I am new to Spark. I am confused between correlation in threads and
physical cores.

As per my understanding, according to number of partitions in data set,
number of tasks is created. For example I have a machine which has 10
physical cores and I have data set which has 100 partitions then in
Executor JVM 100 tasks (one per each partitioner will be created)

Query 1) But how will it be decided how many threads in Executor are
created to execute these 100 tasks and who creates these threads.

Query 2) Does parameter total-executor-cores define how many threads will
be launched in executor JVM to process tasks. If not than what is meaning
of total-executor-cores in context of both threads inside Executor JVM
and physical cores.

Thanks and Regards
Aniruddh


work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Michal Čizmazia
Please could anyone give me pointers for appropriate SparkConf to work
around Size exceeds Integer.MAX_VALUE?

Stacktrace:

2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
GB)
2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
...


How to specify PATHS for user defined functions.

2015-07-09 Thread Dan Dong
Hi, All,
  I have a function and want to access it in my spark programs, but I got
the:
Exception in thread main java.lang.NoSuchMethodError in spark-submit. I
put the function under:
./src/main/scala/com/aaa/MYFUNC/MYFUNC.scala:

package com.aaa.MYFUNC

object MYFUNC{
  def FUNC1(input: List[String]) = {
  ..
}
  }



and in my Spark program I import it like:

import com.aaa.MYFUNC._
...
   val aaa=List(import, org, apache, spark, SparkContext)
   val res=MYFUNC.FUNC1(aaa)
...

But after I sbt package and set the CLASSPATH and spark-submit the
program I got the above error. It's strange that I can import this package
and run the function of val res=MYFUNC.FUNC1(aaa) under a spark-shell
successfully. What's the possible problems? Thanks!

Cheers,
Dan


Re: work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Ted Yu
Which release of Spark are you using ?

Can you show the complete stack trace ?

getBytes() could be called from:
getBytes(file, 0, file.length)
or:
getBytes(segment.file, segment.offset, segment.length)

Cheers

On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com wrote:

 Please could anyone give me pointers for appropriate SparkConf to work
 around Size exceeds Integer.MAX_VALUE?

 Stacktrace:

 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
 BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
 GB)
 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
 Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
 at
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
 ...




Re: Pyspark not working on yarn-cluster mode

2015-07-09 Thread Marcelo Vanzin
You cannot run Spark in cluster mode by instantiating a SparkContext like
that.

You have to launch it with the spark-submit command line script.

On Thu, Jul 9, 2015 at 2:23 PM, jegordon jgordo...@gmail.com wrote:

 Hi to all,

 Is there any way to run pyspark scripts with yarn-cluster mode without
 using
 the spark-submit script? I need it in this way because i will integrate
 this
 code into a django web app.

 When i try to run any script in yarn-cluster mode i got the following error
 :

 org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
 running on a cluster. Deployment to YARN is not supported directly by
 SparkContext. Please use spark-submit.


 I'm creating the sparkContext in the following way :

 conf = (SparkConf()
 .setMaster(yarn-cluster)
 .setAppName(DataFrameTest))

 sc = SparkContext(conf = conf)

 #Dataframe code 

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo


Re: change default storage level

2015-07-09 Thread Michal Čizmazia
Thanks Shixiong! Your response helped me to understand the role of
persist(). No persist() calls were required indeed. I solved my problem by
setting spark.local.dir to allow more space for Spark temporary folder. It
works automatically. I am seeing logs like this:

Not enough space to cache rdd_0_1 in memory!
Persisting partition rdd_0_1 to disk instead.

Before I was getting:

No space left on device


On 9 July 2015 at 11:57, Shixiong Zhu zsxw...@gmail.com wrote:

 Spark won't store RDDs to memory unless you use a memory StorageLevel. By
 default, your input and intermediate results won't be put into memory. You
 can call persist if you want to avoid duplicate computation or reading.
 E.g.,

 val r1 = context.wholeTextFiles(...)
 val r2 = r1.flatMap(s - ...)
 val r3 = r2.filter(...)...
 r3.saveAsTextFile(...)
 val r4 = r2.map(...)...
 r4.saveAsTextFile(...)

 In the avoid example, r2 will be used twice. To speed up the computation,
 you can call r2.persist(StorageLevel.MEMORY) to store r2 into memory. Then
 r4 will use the data of r2 in memory directly. E.g.,

 val r1 = context.wholeTextFiles(...)
 val r2 = r1.flatMap(s - ...)
 r2.persist(StorageLevel.MEMORY)
 val r3 = r2.filter(...)...
 r3.saveAsTextFile(...)
 val r4 = r2.map(...)...
 r4.saveAsTextFile(...)

 See
 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


 Best Regards,
 Shixiong Zhu

 2015-07-09 22:09 GMT+08:00 Michal Čizmazia mici...@gmail.com:

 Is there a way how to change the default storage level?

 If not, how can I properly change the storage level wherever necessary,
 if my input and intermediate results do not fit into memory?

 In this example:

 context.wholeTextFiles(...)
 .flatMap(s - ...)
 .flatMap(s - ...)

 Does persist() need to be called after every transformation?

  context.wholeTextFiles(...)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(s - ...)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(s - ...)
 .persist(StorageLevel.MEMORY_AND_DISK)

  Thanks!





Re: [X-post] Saving SparkSQL result RDD to Cassandra

2015-07-09 Thread Todd Nist
foreachRDD returns a unit:

def foreachRDD(foreachFunc: (RDD
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html
[T]) ⇒ Unit): Unit

Apply a function to each RDD in this DStream. This is an output operator,
so 'this' DStream will be registered as an output stream and therefore
materialized.

Change it to a map, foreach or some other form of transform.

HTH

-Todd


On Thu, Jul 9, 2015 at 5:24 PM, Su She suhsheka...@gmail.com wrote:

 Hello All,

 I also posted this on the Spark/Datastax thread, but thought it was also
 50% a spark question (or mostly a spark question).

 I was wondering what is the best practice to saving streaming Spark SQL (
 https://github.com/Intel-bigdata/spark-streamingsql/blob/master/src/main/scala/org/apache/spark/sql/streaming/examples/KafkaDDL.scala)
 results to Cassandra?

 The query looks like this:

  streamSqlContext.sql(
   
 |SELECT t.word, COUNT(t.word)
 |FROM (SELECT * FROM t_kafka) OVER (WINDOW '9' SECONDS, SLIDE '3'
 SECONDS) AS t
 |GROUP BY t.word
   .stripMargin)
   .foreachRDD { r = r.toString()}.map(x =
 x.split(,)).map(x=data(x(0),x(1))).saveToCassandra(demo, sqltest)

 I’m getting a message saying map isn’t a member of Unit.

 I thought since I'm converting it to a string I can call a map/save to
 Cassandra function there, but it seems like I can't call map after
 r.toString()?

 Please let me know if this is possible and what is the best way of doing
 this. Thank you for the help!

 -Su



Re: work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Matei Zaharia
Thus means that one of your cached RDD partitions is bigger than 2 GB of data. 
You can fix it by having more partitions. If you read data from a file system 
like HDFS or S3, set the number of partitions higher in the sc.textFile, 
hadoopFile, etc methods (it's an optional second parameter to those methods). 
If you create it through parallelize or if this particular RDD comes from a 
shuffle, use more tasks in the parallelize or shuffle.

Matei

 On Jul 9, 2015, at 3:35 PM, Michal Čizmazia mici...@gmail.com wrote:
 
 Spark version 1.4.0 in the Standalone mode
 
 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3) 
 BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB)
 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - 
 Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
 at 
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
 at 
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
 at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
 at 
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:615)
 at 
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 
 
 On 9 July 2015 at 18:11, Ted Yu yuzhih...@gmail.com 
 mailto:yuzhih...@gmail.com wrote:
 Which release of Spark are you using ?
 
 Can you show the complete stack trace ?
 
 getBytes() could be called from:
 getBytes(file, 0, file.length)
 or:
 getBytes(segment.file, segment.offset, segment.length)
 
 Cheers
 
 On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com 
 mailto:mici...@gmail.com wrote:
 Please could anyone give me pointers for appropriate SparkConf to work around 
 Size exceeds Integer.MAX_VALUE?
 
 Stacktrace:
 
 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3) 
 BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB)
 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - 
 Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
 at 
 org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
 ...
 
 
 



Re: Spark Streaming Hangs on Start

2015-07-09 Thread Bin Wang
Thanks for the help. I set  --executor-cores and it works now. I've used
--total-executor-cores and don't realize it changed.

Tathagata Das t...@databricks.com于2015年7月10日周五 上午3:11写道:

 1. There will be a long running job with description start() as that is
 the jobs that is running the receivers. It will never end.

 2. You need to set the number of cores given to the Spark executors by the
 YARN container. That is SparkConf spark.executor.cores,  --executor-cores
 in spark-submit. Since it is by default 1, your only container has one core
 which is occupied by the receiver, leaving no cores to run the map tasks.
 So the map stage is blocked

 3.  Note these log lines. Especially 15/07/09 18:29:00 INFO
 receiver.ReceiverSupervisorImpl: Received stop signal . I think somehow
 your streaming context is being shutdown too early which is causing the
 KafkaReceiver to stop. Something your should debug.


 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Starting
 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] Added fetcher for partitions 
 ArrayBuffer([[adhoc_data,0], initOffset 53 to broker 
 id:42,host:szq1.appadhoc.com,port:9092] )
 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with 
 curMem=96628, maxMem=16669841817
 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 
 stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB)
 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 
 replicated to only 0 peer(s) instead of 1 peers
 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block 
 input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: 
 Received stop signal
 *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver 
 with message: Stopped by driver:
 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], 
 ZKConsumerConnector shutting down
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] Stopping leader finder thread
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
  Shutting down
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
  Stopped
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
  Shutdown completed
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] Stopping all fetchers
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Shutting down
 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket 
 error: java.nio.channels.ClosedByInterruptException
 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Stopped
 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Shutdown completed
 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] All connections stopped
 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event 
 thread.
 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed
 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down
 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], 
 ZKConsumerConnector shutdown completed in 74 ms
 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering 
 receiver 0






  1   2   >