Re: scalac crash when compiling DataTypeConversions.scala

2014-10-23 Thread Patrick Wendell
Hey Ryan,

I've found that filing issues with the Scala/Typesafe JIRA is pretty
helpful if the issue can be fully reproduced, and even sometimes
helpful if it can't. You can file bugs here:

https://issues.scala-lang.org/secure/Dashboard.jspa

The Spark SQL code in particular is typically the source of these, as
we use more fancy scala features. In a pinch it is also possible to
recompile and test code without building SQL if you just run tests for
the specific module (e.g. streaming). In sbt this sort of just
works:

sbt/sbt streaming/test-only a.b.c*

In maven it's more clunky but if you do a mvn install first then (I
think) you can test sub-modules independently:

mvn test -pl streaming ...

- Patrick

On Wed, Oct 22, 2014 at 10:00 PM, Ryan Williams
ryan.blake.willi...@gmail.com wrote:
 I started building Spark / running Spark tests this weekend and on maybe
 5-10 occasions have run into a compiler crash while compiling
 DataTypeConversions.scala.

 Here is a full gist of an innocuous test command (mvn test
 -Dsuites='*KafkaStreamSuite') exhibiting this behavior. Problem starts on
 L512 and there's a final stack trace at the bottom.

 mvn clean or ./sbt/sbt clean fix it (I believe I've observed the issue
 while compiling with each tool), but are annoying/time-consuming to do,
 obvs, and it's happening pretty frequently for me when doing only small
 numbers of incremental compiles punctuated by e.g. checking out different
 git commits.

 Have other people seen this? This post on this list is basically the same
 error, but in TestSQLContext.scala and this SO post claims to be hitting it
 when trying to build in intellij.

 It seems likely to be a bug in scalac; would finding a consistent repro case
 and filing it somewhere be useful?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: About Memory usage in the Spark UI

2014-10-23 Thread Patrick Wendell
It shows the amount of memory used to store RDD blocks, which are created
when you run .cache()/.persist() on an RDD.

On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote:

  Hi, please take a look at the attached screen-shot. I wonders what's the
 Memory Used column mean.



 I give 2GB memory to the driver process and 12GB memory to the executor
 process.



 Thank you!






Re: Spark Hive Snappy Error

2014-10-23 Thread arthur.hk.c...@gmail.com
HI

Removed export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar”

It works, THANK YOU!!

Regards 
Arthur
 

On 23 Oct, 2014, at 1:00 pm, Shao, Saisai saisai.s...@intel.com wrote:

 Seems you just add snappy library into your classpath:
  
 export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar
  
 But for spark itself, it depends on snappy-0.2.jar. Is there any possibility 
 that this problem caused by different version of snappy?
  
 Thanks
 Jerry
  
 From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] 
 Sent: Thursday, October 23, 2014 11:32 AM
 To: Shao, Saisai
 Cc: arthur.hk.c...@gmail.com; user
 Subject: Re: Spark Hive Snappy Error
  
 Hi,
  
 Please find the attached file.
  
  
  
 my spark-default.xml
 # Default system properties included when running spark-submit.
 # This is useful for setting default environmental settings.
 #
 # Example:
 # spark.masterspark://master:7077
 # spark.eventLog.enabled  true
 # spark.eventLog.dir
   hdfs://namenode:8021/directory
 # spark.serializerorg.apache.spark.serializer.KryoSerializer
 #
 spark.executor.memory   2048m
 spark.shuffle.spill.compressfalse
 spark.io.compression.codec
 org.apache.spark.io.SnappyCompressionCodec
  
  
  
 my spark-env.sh
 #!/usr/bin/env bash
 export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar
 export 
 CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar
 export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64
 export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop}
 export SPARK_WORKER_DIR=/edh/hadoop_data/spark_work/
 export SPARK_LOG_DIR=/edh/hadoop_logs/spark
 export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64
 export 
 SPARK_CLASSPATH=$SPARK_HOME/lib_managed/jars/mysql-connector-java-5.1.31-bin.jar
 export 
 SPARK_CLASSPATH=$SPARK_CLASSPATH:$HBASE_HOME/lib/*:$HIVE_HOME/csv-serde-1.1.2-0.11.0-all.jar:
 export SPARK_WORKER_MEMORY=2g
 export HADOOP_HEAPSIZE=2000
 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER 
 -Dspark.deploy.zookeeper.url=m35:2181,m33:2181,m37:2181
 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC
  
  
 ll $HADOOP_HOME/lib/native/Linux-amd64-64
 -rw-rw-r--. 1 tester tester50523 Aug 27 14:12 hadoop-auth-2.4.1.jar
 -rw-rw-r--. 1 tester tester  1062640 Aug 27 12:19 libhadoop.a
 -rw-rw-r--. 1 tester tester  1487564 Aug 27 11:14 libhadooppipes.a
 lrwxrwxrwx. 1 tester tester   24 Aug 27 07:08 libhadoopsnappy.so - 
 libhadoopsnappy.so.0.0.1
 lrwxrwxrwx. 1 tester tester   24 Aug 27 07:08 libhadoopsnappy.so.0 - 
 libhadoopsnappy.so.0.0.1
 -rwxr-xr-x. 1 tester tester54961 Aug 27 07:08 libhadoopsnappy.so.0.0.1
 -rwxrwxr-x. 1 tester tester   630328 Aug 27 12:19 libhadoop.so
 -rwxrwxr-x. 1 tester tester   630328 Aug 27 12:19 libhadoop.so.1.0.0
 -rw-rw-r--. 1 tester tester   582472 Aug 27 11:14 libhadooputils.a
 -rw-rw-r--. 1 tester tester   298626 Aug 27 11:14 libhdfs.a
 -rwxrwxr-x. 1 tester tester   200370 Aug 27 11:14 libhdfs.so
 -rwxrwxr-x. 1 tester tester   200370 Aug 27 11:14 libhdfs.so.0.0.0
 lrwxrwxrwx. 1 tester tester 55 Aug 27 07:08 libjvm.so 
 -/usr/lib/jvm/jdk1.6.0_45/jre/lib/amd64/server/libjvm.so
 lrwxrwxrwx. 1 tester tester   25 Aug 27 07:08 libprotobuf-lite.so - 
 libprotobuf-lite.so.8.0.0
 lrwxrwxrwx. 1 tester tester   25 Aug 27 07:08 libprotobuf-lite.so.8 
 - libprotobuf-lite.so.8.0.0
 -rwxr-xr-x. 1 tester tester   964689 Aug 27 07:08 
 libprotobuf-lite.so.8.0.0
 lrwxrwxrwx. 1 tester tester   20 Aug 27 07:08 libprotobuf.so - 
 libprotobuf.so.8.0.0
 lrwxrwxrwx. 1 tester tester   20 Aug 27 07:08 libprotobuf.so.8 - 
 libprotobuf.so.8.0.0
 -rwxr-xr-x. 1 tester tester  8300050 Aug 27 07:08 libprotobuf.so.8.0.0
 lrwxrwxrwx. 1 tester tester   18 Aug 27 07:08 libprotoc.so - 
 libprotoc.so.8.0.0
 lrwxrwxrwx. 1 tester tester   18 Aug 27 07:08 libprotoc.so.8 - 
 libprotoc.so.8.0.0
 -rwxr-xr-x. 1 tester tester  9935810 Aug 27 07:08 libprotoc.so.8.0.0
 -rw-r--r--. 1 tester tester   233554 Aug 27 15:19 libsnappy.a
 lrwxrwxrwx. 1 tester tester   23 Aug 27 11:32 libsnappy.so - 
 /usr/lib64/libsnappy.so
 lrwxrwxrwx. 1 tester tester   23 Aug 27 11:33 libsnappy.so.1 - 
 /usr/lib64/libsnappy.so
 -rwxr-xr-x. 1 tester tester   147726 Aug 27 07:08 libsnappy.so.1.2.0
 drwxr-xr-x. 2 tester tester 4096 Aug 27 07:08 pkgconfig
  
  
 Regards
 Arthur
  
  
 On 23 Oct, 2014, at 10:57 am, Shao, Saisai saisai.s...@intel.com wrote:
 
 
 Hi Arthur,
  
 I think your problem might be different from what 
 SPARK-3958(https://issues.apache.org/jira/browse/SPARK-3958) mentioned, seems 
 your problem is more likely to be a library link problem, would you mind 
 checking your Spark runtime to see if the snappy.so is loaded or not? 
 (through lsof -p).
  
 I guess your problem is more likely to be a library not found problem.
  
  
 Thanks
 

Is Spark streaming suitable for our architecture?

2014-10-23 Thread Albert Vila
Hi

I'm evaluating Spark streaming to see if it fits to scale or current
architecture.

We are currently downloading and processing 6M documents per day from
online and social media. We have a different workflow for each type of
document, but some of the steps are keyword extraction, language detection,
clustering, classification, indexation,  We are using Gearman to
dispatch the job to workers and we have some queues on a database.
Everything is in near real time.

I'm wondering if we could integrate Spark streaming on the current workflow
and if it's feasible. One of our main discussions are if we have to go to a
fully distributed architecture or to a semi-distributed one. I mean,
distribute everything or process some steps on the same machine (crawling,
keyword extraction, language detection, indexation). We don't know which
one scales more, each one has pros and cont.

Now we have a semi-distributed one as we had network problems taking into
account the amount of data we were moving around. So now, all documents
crawled on server X, later on are dispatched through Gearman to the same
server. What we dispatch on Gearman is only the document id, and the
document data remains on the crawling server on a Memcached, so the network
traffic is keep at minimum.

It's feasible to remove all database queues and Gearman and move to Spark
streaming? We are evaluating to add Kakta to the system too.
Is anyone using Spark streaming for a system like ours?
Should we worry about the network traffic? or it's something Spark can
manage without problems. Every document is arround 50k (300Gb a day +/-).
If we wanted to isolate some steps to be processed on the same machine/s
(or give priority), is something we could do with Spark?

Any help or comment will be appreciate. And If someone has had a similar
problem and has knowledge about the architecture approach will be more than
welcomed.

Thanks


Dynamically loaded Spark-stream consumer

2014-10-23 Thread Jianshi Huang
I have a use case that I need to continuously ingest data from Kafka
stream. However apart from ingestion (to HBase), I also need to compute
some metrics (i.e. avg for last min, etc.).

The problem is that it's very likely I'll continuously add more metrics and
I don't want to restart my spark program from time to time.

Is there a mechanism that Spark stream can load and plugin code in runtime
without restarting?

Any solutions or suggestions?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


How to access objects declared and initialized outside the call() method of JavaRDD

2014-10-23 Thread Localhost shell
Hey All,

I am unable to access objects declared and initialized outside the call()
method of JavaRDD.

In the below code snippet, call() method makes a fetch call to C* but since
javaSparkContext is defined outside the call method scope so compiler give
a compilation error.

stringRdd.foreach(new VoidFunctionString() {
@Override
public void call(String str) throws Exception {
JavaRDDString vals =
javaFunctions(javaSparkContext).cassandraTable(schema, table,
String.class)
.select(val);
}
});

In other languages I have used closure to do this but not able to achieve
the same here.

Can someone suggest how to achieve this in the current code context?


--Unilocal


RE: About Memory usage in the Spark UI

2014-10-23 Thread Haopu Wang
Patrick, thanks for the response. May I ask more questions?

 

I'm running a Spark Streaming application which receives data from socket and 
does some transformations.

 

The event injection rate is too high so the processing duration is larger than 
batch interval.

 

So I see Could not compute split, block input-0-1414049609200 not found issue 
as discussed by others in this post: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237;

 

If the understanding is correct, Spark is lack of storage in this case because 
of event pile-up, so it needs to delete some splits in order to free memory.

 

However, even in this case, I still see very small number (like 3MB) in the 
Memory Used column where the total memory seems to be quite big (like 6GB). 
So I think the number shown in this column may have problems.

 

How do Spark calculate the total memory based on allocated JVM heap size? I 
guess it's related with the spark.storage.memoryFraction configuration, but 
want to know the details.

And why the driver also uses memory to store RDD blocks?

 

Thanks again for the answer!

 



From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: 2014年10月23日 14:00
To: Haopu Wang
Cc: user
Subject: Re: About Memory usage in the Spark UI

 

It shows the amount of memory used to store RDD blocks, which are created when 
you run .cache()/.persist() on an RDD.

 

On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote:

Hi, please take a look at the attached screen-shot. I wonders what's the 
Memory Used column mean.

 

I give 2GB memory to the driver process and 12GB memory to the executor process.

 

Thank you!

 

 

 



Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic

2014-10-23 Thread Jianshi Huang
The Kafka stream has 10 topics and the data rate is quite high (~ 100K/s
per topic).

Which configuration do you recommend?
- 1 Spark app consuming all Kafka topics
- 10 separate Spark app each consuming one topic

Assuming they have the same resource pool.

Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


NoClassDefFoundError on ThreadFactoryBuilder in Intellij

2014-10-23 Thread Stephen Boesch
After having checked out from master/head the following error occurs when
attempting to run any test in Intellij

Exception in thread main java.lang.NoClassDefFoundError:
com/google/common/util/concurrent/ThreadFactoryBuilder
at org.apache.spark.util.Utils$.init(Utils.scala:648)


There appears to be a related issue/JIRA:


https://issues.apache.org/jira/browse/SPARK-3217


But the conditions described do not apply in my case:

 Did you by any chance do one of the following:

   - forget to clean after pulling that change
   - mix sbt and mvn built artifacts in the same build
   - set SPARK_PREPEND_CLASSES


For reference here is the full stacktrace:

Exception in thread main java.lang.NoClassDefFoundError:
com/google/common/util/concurrent/ThreadFactoryBuilder
at org.apache.spark.util.Utils$.init(Utils.scala:648)
at org.apache.spark.util.Utils$.clinit(Utils.scala)
at org.apache.spark.SparkContext.init(SparkContext.scala:179)
at org.apache.spark.SparkContext.init(SparkContext.scala:119)
at org.apache.spark.SparkContext.init(SparkContext.scala:134)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:62)
at
org.apache.spark.sql.hbase.JavaHBaseSQLContext$.main(JavaHBaseSQLContext.scala:45)
at
org.apache.spark.sql.hbase.JavaHBaseSQLContext.main(JavaHBaseSQLContext.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.ClassNotFoundException:
com.google.common.util.concurrent.ThreadFactoryBuilder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 13 more
Exception in thread delete Spark temp dirs
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.util.Utils$
at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:173)


Re: How to access objects declared and initialized outside the call() method of JavaRDD

2014-10-23 Thread Sean Owen
In Java, javaSparkContext would have to be declared final in order for
it to be accessed inside an inner class like this. But this would
still not work as the context is not serializable. You  should rewrite
this so you are not attempting to use the Spark context inside  an
RDD.

On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell
universal.localh...@gmail.com wrote:
 Hey All,

 I am unable to access objects declared and initialized outside the call()
 method of JavaRDD.

 In the below code snippet, call() method makes a fetch call to C* but since
 javaSparkContext is defined outside the call method scope so compiler give a
 compilation error.

 stringRdd.foreach(new VoidFunctionString() {
 @Override
 public void call(String str) throws Exception {
 JavaRDDString vals =
 javaFunctions(javaSparkContext).cassandraTable(schema, table,
 String.class)
 .select(val);
 }
 });

 In other languages I have used closure to do this but not able to achieve
 the same here.

 Can someone suggest how to achieve this in the current code context?


 --Unilocal




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Solving linear equations

2014-10-23 Thread Sean Owen
The 0 vector is a trivial solution. Is the data big, such that it
can't be computed on one machine? if so I assume this system is
over-determined. You can use a decomposition to find a least-squares
solution, but the SVD is overkill and in any event distributed
decompositions don't exist in the project. You can solve it a linear
regression as Mr Das says.

If it's small enough to fit locally you should just use a matrix
library to solve Ax = b with the QR decomposition or something, with
Breeze or Commons Math or octave or R. Lots of options if it's
smallish.

On Thu, Oct 23, 2014 at 12:15 AM, Martin Enzinger
martin.enzin...@gmail.com wrote:
 Hi,

 I'm wondering how to use Mllib for solving equation systems following this
 pattern

 2*x1 + x2 + 3*x3 +  + xn = 0
 x1 + 0*x2 + 3*x3 +  + xn = 0
 ..
 ..
 0*x1 + x2 + 0*x3 +  + xn = 0

 I definitely still have some reading to do to really understand the direct
 solving techniques, but at the current state of knowledge SVD could help
 me with this right?

 Can you point me to an example or a tutorial?

 best regards

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Spark streaming suitable for our architecture?

2014-10-23 Thread Jayant Shekhar
Hi Albert,

Have a couple of questions:

   - You mentioned near real-time. What exactly is your SLA for processing
   each document?
   - Which crawler are you using and are you looking to bring in Hadoop
   into your overall workflow. You might want to read up on how network
   traffic is minimized/managed on the Hadoop cluster - as you had run into
   network issues with your current architecture.

Thanks!

On Thu, Oct 23, 2014 at 12:07 AM, Albert Vila albert.v...@augure.com
wrote:

 Hi

 I'm evaluating Spark streaming to see if it fits to scale or current
 architecture.

 We are currently downloading and processing 6M documents per day from
 online and social media. We have a different workflow for each type of
 document, but some of the steps are keyword extraction, language detection,
 clustering, classification, indexation,  We are using Gearman to
 dispatch the job to workers and we have some queues on a database.
 Everything is in near real time.

 I'm wondering if we could integrate Spark streaming on the current
 workflow and if it's feasible. One of our main discussions are if we have
 to go to a fully distributed architecture or to a semi-distributed one. I
 mean, distribute everything or process some steps on the same machine
 (crawling, keyword extraction, language detection, indexation). We don't
 know which one scales more, each one has pros and cont.

 Now we have a semi-distributed one as we had network problems taking into
 account the amount of data we were moving around. So now, all documents
 crawled on server X, later on are dispatched through Gearman to the same
 server. What we dispatch on Gearman is only the document id, and the
 document data remains on the crawling server on a Memcached, so the network
 traffic is keep at minimum.

 It's feasible to remove all database queues and Gearman and move to Spark
 streaming? We are evaluating to add Kakta to the system too.
 Is anyone using Spark streaming for a system like ours?
 Should we worry about the network traffic? or it's something Spark can
 manage without problems. Every document is arround 50k (300Gb a day +/-).
 If we wanted to isolate some steps to be processed on the same machine/s
 (or give priority), is something we could do with Spark?

 Any help or comment will be appreciate. And If someone has had a similar
 problem and has knowledge about the architecture approach will be more than
 welcomed.

 Thanks




SparkSQL and columnar data

2014-10-23 Thread Marius Soutier
Hi guys,

another question: what’s the approach to working with column-oriented data, 
i.e. data with more than 1000 columns. Using Parquet for this should be fine, 
but how well does SparkSQL handle the big amount of columns? Is there a limit? 
Should we use standard Spark instead?

Thanks for any insights,
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multitenancy in Spark - within/across spark context

2014-10-23 Thread Jianshi Huang
Upvote for the multitanency requirement.

I'm also building a data analytic platform and there'll be multiple users
running queries and computations simultaneously. One of the paint point is
control of resource size. Users don't really know how much nodes they need,
they always use as much as possible... The result is lots of wasted
resource in our Yarn cluster.

A way to 1) allow multiple spark context to share the same resource or 2)
add dynamic resource management for Yarn mode is very much wanted.

Jianshi

On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar
 ashwinshanka...@gmail.com wrote:
  That's not something you might want to do usually. In general, a
  SparkContext maps to a user application
 
  My question was basically this. In this page in the official doc, under
  Scheduling within an application section, it talks about multiuser and
  fair sharing within an app. How does multiuser within an application
  work(how users connect to an app,run their stuff) ? When would I want to
 use
  this ?

 I see. The way I read that page is that Spark supports all those
 scheduling options; but Spark doesn't give you the means to actually
 be able to submit jobs from different users to a running SparkContext
 hosted on a different process. For that, you'll need something like
 the job server that I referenced before, or write your own framework
 for supporting that.

 Personally, I'd use the information on that page when dealing with
 concurrent jobs in the same SparkContext, but still restricted to the
 same user. I'd avoid trying to create any application where a single
 SparkContext is trying to be shared by multiple users in any way.

  As far as I understand, this will cause executors to be killed, which
  means that Spark will start retrying tasks to rebuild the data that
  was held by those executors when needed.
 
  I basically wanted to find out if there were any gotchas related to
  preemption on Spark. Things like say half of an application's executors
 got
  preempted say while doing reduceByKey, will the application progress with
  the remaining resources/fair share ?

 Jobs should still make progress as long as at least one executor is
 available. The gotcha would be the one I mentioned, where Spark will
 fail your job after x executors failed, which might be a common
 occurrence when preemption is enabled. That being said, it's a
 configurable option, so you can set x to a very large value and your
 job should keep on chugging along.

 The options you'd want to take a look at are: spark.task.maxFailures
 and spark.yarn.max.executor.failures

 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Spark Cassandra Connector proper usage

2014-10-23 Thread Ashic Mahtab
I'm looking to use spark for some ETL, which will mostly consist of update 
statements (a column is a set, that'll be appended to, so a simple insert is 
likely not going to work). As such, it seems like issuing CQL queries to import 
the data is the best option. Using the Spark Cassandra Connector, I see I can 
do this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
Now I don't want to open a session and close it for every row in the source (am 
I right in not wanting this? Usually, I have one session for the entire 
process, and keep using that in normal apps). However, it says that the 
connector is serializable, but the session is obviously not. So, wrapping the 
whole import inside a single withSessionDo seems like it'll cause problems. I 
was thinking of using something like this:
class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
//session.execute cql goes here
  }
}

Is this a good approach? Do I need to worry about closing the session? Where / 
how best would I do that? Any pointers are appreciated.
Thanks,Ashic.
  

Re: Is Spark streaming suitable for our architecture?

2014-10-23 Thread Albert Vila
Hi Jayant,

On 23 October 2014 11:14, Jayant Shekhar jay...@cloudera.com wrote:

 Hi Albert,

 Have a couple of questions:

- You mentioned near real-time. What exactly is your SLA for
processing each document?

 The minimum the best :). Right now it's between 30s - 5m, but I would like
to have something stable arround 1-2m if possible. Taking into account that
the system should be able to scale to 50M - 100M documents.



- Which crawler are you using and are you looking to bring in Hadoop
into your overall workflow. You might want to read up on how network
traffic is minimized/managed on the Hadoop cluster - as you had run into
network issues with your current architecture.

 Everything is developed by us. The network issues were not related to the
crawler itself, they were related to the documents we were moving around
the system to be processed for each workflow stage. And yes, we are
currently researching if we can introduce Spark streaming to be able to
scale and execute all workflow stages and use Hdfs/Cassandra to store the
data.

Should we use the DStream persist function (if we use every document as a
RDD), in order to reuse the same data or it's better to create new
DStreams? On each step we add additional data to the document, for example
on the language extraction, we begin with a document without language, and
we output the document with a new language field.

Thanks


 Thanks!

 On Thu, Oct 23, 2014 at 12:07 AM, Albert Vila albert.v...@augure.com
 wrote:

 Hi

 I'm evaluating Spark streaming to see if it fits to scale or current
 architecture.

 We are currently downloading and processing 6M documents per day from
 online and social media. We have a different workflow for each type of
 document, but some of the steps are keyword extraction, language detection,
 clustering, classification, indexation,  We are using Gearman to
 dispatch the job to workers and we have some queues on a database.
 Everything is in near real time.

 I'm wondering if we could integrate Spark streaming on the current
 workflow and if it's feasible. One of our main discussions are if we have
 to go to a fully distributed architecture or to a semi-distributed one. I
 mean, distribute everything or process some steps on the same machine
 (crawling, keyword extraction, language detection, indexation). We don't
 know which one scales more, each one has pros and cont.

 Now we have a semi-distributed one as we had network problems taking into
 account the amount of data we were moving around. So now, all documents
 crawled on server X, later on are dispatched through Gearman to the same
 server. What we dispatch on Gearman is only the document id, and the
 document data remains on the crawling server on a Memcached, so the network
 traffic is keep at minimum.

 It's feasible to remove all database queues and Gearman and move to Spark
 streaming? We are evaluating to add Kakta to the system too.
 Is anyone using Spark streaming for a system like ours?
 Should we worry about the network traffic? or it's something Spark can
 manage without problems. Every document is arround 50k (300Gb a day +/-).
 If we wanted to isolate some steps to be processed on the same machine/s
 (or give priority), is something we could do with Spark?

 Any help or comment will be appreciate. And If someone has had a similar
 problem and has knowledge about the architecture approach will be more than
 welcomed.

 Thanks




what's the best way to initialize an executor?

2014-10-23 Thread Darin McBeath
I have some code that I only need to be executed once per executor in my spark 
application.  My current approach is to do something like the following:
scala xmlKeyPair.foreachPartition(i = XPathProcessor.init(ats, 
Namespaces/NamespaceContext))
So, If I understand correctly, the XPathProcessor.init will be called once per 
partition.  Since I have 48 partitions for this RDD and 2 million documents, 
that seems acceptable.  The downside is that I likely will have fewer executors 
than 48 (each executor will handle more than 1 partition) so the executor would 
be called more than once with XPathProcessor.init.  I have code in place to 
make sure this is not an issue.  But, I was wondering if there is a better way 
to accomplish something like this.
Thanks.
Darin.



RE: Spark Cassandra Connector proper usage

2014-10-23 Thread Ashic Mahtab
Hi Gerard,
Thanks for the response. Here's the scenario:

The target cassandra schema looks like this:

create table foo (
   id text primary key,
   bar int,
   things settext
)

The source in question is a Sql Server source providing the necessary data. The 
source goes over the same id multiple times adding things to the things set 
each time. With inserts, it'll replace things with a new set of one element, 
instead of appending that item. As such, the query

update foo set things = things + ? where id=?

solves the problem. If I had to stick with saveToCassasndra, I'd have to read 
in the existing row for each row, and then write it back. Since this is 
happening in parallel on multiple machines, that would likely cause 
discrepancies where a node will read and update to older values. Hence my 
question about session management in order to issue custom update queries.

Thanks,
Ashic.

Date: Thu, 23 Oct 2014 14:27:47 +0200
Subject: Re: Spark Cassandra Connector proper usage
From: gerard.m...@gmail.com
To: as...@live.com

Ashic,

With the Spark-cassandra connector you would typically create an RDD from the 
source table, update what you need,  filter out what you don't update and write 
it back to Cassandra.
Kr, Gerard
On Oct 23, 2014 1:21 PM, Ashic Mahtab as...@live.com wrote:



I'm looking to use spark for some ETL, which will mostly consist of update 
statements (a column is a set, that'll be appended to, so a simple insert is 
likely not going to work). As such, it seems like issuing CQL queries to import 
the data is the best option. Using the Spark Cassandra Connector, I see I can 
do this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
Now I don't want to open a session and close it for every row in the source (am 
I right in not wanting this? Usually, I have one session for the entire 
process, and keep using that in normal apps). However, it says that the 
connector is serializable, but the session is obviously not. So, wrapping the 
whole import inside a single withSessionDo seems like it'll cause problems. I 
was thinking of using something like this:
class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
//session.execute cql goes here
  }
}

Is this a good approach? Do I need to worry about closing the session? Where / 
how best would I do that? Any pointers are appreciated.
Thanks,Ashic.
  
  

Aggregation Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:

2014-10-23 Thread arthur.hk.c...@gmail.com
Hi,

I got $TreeNodeException, few questions:
Q1) How should I do aggregation in SparK? Can I use aggregation directly in 
SQL? or
Q1) Should I use SQL to load the data to form RDD then use scala to do the 
aggregation?

Regards
Arthur


MySQL (good one, without aggregation): 
sqlContext.sql(SELECT L_RETURNFLAG FROM LINEITEM WHERE  
L_SHIPDATE='1998-09-02'  GROUP  BY L_RETURNFLAG, L_LINESTATUS ORDER  BY 
L_RETURNFLAG, L_LINESTATUS).collect().foreach(println);
[A]
[N]
[N]
[R]


My SQL (problem SQL, with aggregation):
sqlContext.sql(SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS SUM_QTY, 
SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT 
)) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 1 + L_TAX )) 
AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) AS AVG_PRICE, 
AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER  FROM LINEITEM WHERE  
L_SHIPDATE='1998-09-02'  GROUP  BY L_RETURNFLAG, L_LINESTATUS ORDER  BY 
L_RETURNFLAG, L_LINESTATUS).collect().foreach(println);

14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks 
have all completed, from pool 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true
 Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200)
  Aggregate false, [l_returnflag#200,l_linestatus#201], 
[l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS 
sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS 
sum_disc_price#183,SUM(PartialSum#219) AS 
sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / 
CAST(SUM(PartialCount#221L), DoubleType)) AS 
avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / 
CAST(SUM(PartialCount#223L), DoubleType)) AS 
avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / 
CAST(SUM(PartialCount#225L), DoubleType)) AS 
avg_disc#187,SUM(PartialCount#226L) AS count_order#188L]
   Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200)
Aggregate true, [l_returnflag#200,l_linestatus#201], 
[l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS 
PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS 
PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS 
PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) 
AS PartialCount#221L,SUM(l_quantity#193) AS 
PartialSum#220,COUNT(l_extendedprice#194) AS 
PartialCount#223L,SUM(l_extendedprice#194) AS 
PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + 
l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216]
 Project 
[l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193]
  Filter (l_shipdate#197 = 1998-09-02)
   HiveTableScan 
[l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193],
 (MetastoreRelation boc_12, lineitem, None), None

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at $iwC$$iwC$$iwC$$iwC.init(console:15)
at $iwC$$iwC$$iwC.init(console:20)
at $iwC$$iwC.init(console:22)
at $iwC.init(console:24)
at init(console:26)
at .init(console:30)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at 

Re: Spark Streaming Applications

2014-10-23 Thread Saiph Kappa
What is the application about? I couldn't find any proper description
regarding the purpose of killrweather ( I mean, other than just integrating
Spark with Cassandra). Do you know if the slides of that tutorial are
available somewhere?

Thanks!

On Wed, Oct 22, 2014 at 6:58 PM, Sameer Farooqui same...@databricks.com
wrote:

 Hi Saiph,

 Patrick McFadin and Helena Edelson from DataStax taught a tutorial at NYC
 Strata last week where they created a prototype Spark Streaming + Kafka
 application for time series data.

 You can see the code here:
 https://github.com/killrweather/killrweather


 On Tue, Oct 21, 2014 at 4:33 PM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I have been trying to find a fairly complex application that makes use of
 the Spark Streaming framework. I checked public github repos but the
 examples I found were too simple, only comprising simple operations like
 counters and sums. On the Spark summit website, I could find very
 interesting projects, however no source code was available.

 Where can I find non-trivial spark streaming application code? Is it that
 difficult?

 Thanks.





Re: Aggregation Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:

2014-10-23 Thread Yin Huai
Hello Arthur,

You can use do aggregations in SQL. How did you create LINEITEM?

Thanks,

Yin

On Thu, Oct 23, 2014 at 8:54 AM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

 Hi,

 I got $TreeNodeException, few questions:
 Q1) How should I do aggregation in SparK? Can I use aggregation directly
 in SQL? or
 Q1) Should I use SQL to load the data to form RDD then use scala to do the
 aggregation?

 Regards
 Arthur


 MySQL (good one, without aggregation):
 sqlContext.sql(SELECT L_RETURNFLAG FROM LINEITEM WHERE
 L_SHIPDATE='1998-09-02'  GROUP  BY L_RETURNFLAG, L_LINESTATUS ORDER  BY
 L_RETURNFLAG, L_LINESTATUS).collect().foreach(println);
 [A]
 [N]
 [N]
 [R]


 My SQL (problem SQL, with aggregation):
 sqlContext.sql(SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS
 SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1
 - L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT )
 * ( 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY,
 AVG(L_EXTENDEDPRICE) AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS
 COUNT_ORDER  FROM LINEITEM WHERE  L_SHIPDATE='1998-09-02'  GROUP  BY
 L_RETURNFLAG, L_LINESTATUS ORDER  BY L_RETURNFLAG,
 L_LINESTATUS).collect().foreach(println);

 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose
 tasks have all completed, from pool
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
 Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true
  Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC],
 200)
   Aggregate false, [l_returnflag#200,l_linestatus#201],
 [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS
 sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218)
 AS sum_disc_price#183,SUM(PartialSum#219) AS
 sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) /
 CAST(SUM(PartialCount#221L), DoubleType)) AS
 avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) /
 CAST(SUM(PartialCount#223L), DoubleType)) AS
 avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) /
 CAST(SUM(PartialCount#225L), DoubleType)) AS
 avg_disc#187,SUM(PartialCount#226L) AS count_order#188L]
Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200)
 Aggregate true, [l_returnflag#200,l_linestatus#201],
 [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS
 PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS
 PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS
 PartialSum#218,SUM(l_extendedprice#194) AS
 PartialSum#217,COUNT(l_quantity#193) AS
 PartialCount#221L,SUM(l_quantity#193) AS
 PartialSum#220,COUNT(l_extendedprice#194) AS
 PartialCount#223L,SUM(l_extendedprice#194) AS
 PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 +
 l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216]
  Project
 [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193]
   Filter (l_shipdate#197 = 1998-09-02)
HiveTableScan
 [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193],
 (MetastoreRelation boc_12, lineitem, None), None

 at
 org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
 at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at $iwC$$iwC$$iwC$$iwC.init(console:15)
 at $iwC$$iwC$$iwC.init(console:20)
 at $iwC$$iwC.init(console:22)
 at $iwC.init(console:24)
 at init(console:26)
 at .init(console:30)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
 at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
 at
 

Re: Spark Cassandra Connector proper usage

2014-10-23 Thread Gerard Maas
Hi Ashic,

At the moment I see two options:

1) You could  use the CassandraConnector object to execute your specialized
query. The recommended pattern is to to that within a
rdd.foreachPartition(...) in order to amortize DB connection setup over the
number of elements in on partition.   Something like this:

val sparkContext = ???
val cassandraConnector = CassandraConnector(conf)
val dataRdd = ??? // I assume this is the source of data
val rddThingById = dataRdd.map(elem = transformToIdByThing(elem) )
 rddThingById.foreachPartition(partition = {
cassandraConnector.withSessionDo{ session =
  partition.foreach(record = session.execute(update foo set things =
things + ? where id=? , record.id, record.thing)
   }
 }

2) You could change your datamodel slightly in order to avoid the update
operation.
Actually, the cassandra representation of a set is nothing more than a
column - timestamp, where the column name is an element of the set.
So Set (a,b,c) = Column(a)- ts, Column(b) - ts, Column(c) - tx

So, if you desugarize your datamodel, you could use something like:
create table foo (
   id text primary key,
   bar int,
   things text,
   ts timestamp,
   primary key ((id, bar), things)
)

And your Spark process would be reduced to:
val sparkContext = ???
val dataRdd = ??? // I assume this is the source of data
dataRdd.map(elem = transformToIdBarThingByTimeStamp(elem)
).saveToCassandra(ks, foo,Columns(id, bar, thing, ts))


Hope this helps.

-kr, Gerard.











On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab as...@live.com wrote:

 Hi Gerard,
 Thanks for the response. Here's the scenario:

 The target cassandra schema looks like this:

 create table foo (
id text primary key,
bar int,
things settext
 )

 The source in question is a Sql Server source providing the necessary
 data. The source goes over the same id multiple times adding things to
 the things set each time. With inserts, it'll replace things with a new
 set of one element, instead of appending that item. As such, the query

 update foo set things = things + ? where id=?

 solves the problem. If I had to stick with saveToCassasndra, I'd have to
 read in the existing row for each row, and then write it back. Since this
 is happening in parallel on multiple machines, that would likely cause
 discrepancies where a node will read and update to older values. Hence my
 question about session management in order to issue custom update queries.

 Thanks,
 Ashic.

 --
 Date: Thu, 23 Oct 2014 14:27:47 +0200
 Subject: Re: Spark Cassandra Connector proper usage
 From: gerard.m...@gmail.com
 To: as...@live.com


 Ashic,
 With the Spark-cassandra connector you would typically create an RDD from
 the source table, update what you need,  filter out what you don't update
 and write it back to Cassandra.

 Kr, Gerard
 On Oct 23, 2014 1:21 PM, Ashic Mahtab as...@live.com wrote:

 I'm looking to use spark for some ETL, which will mostly consist of
 update statements (a column is a set, that'll be appended to, so a simple
 insert is likely not going to work). As such, it seems like issuing CQL
 queries to import the data is the best option. Using the Spark Cassandra
 Connector, I see I can do this:



 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra



 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra

 Now I don't want to open a session and close it for every row in the
 source (am I right in not wanting this? Usually, I have one session for the
 entire process, and keep using that in normal apps). However, it says
 that the connector is serializable, but the session is obviously not. So,
 wrapping the whole import inside a single withSessionDo seems like it'll
 cause problems. I was thinking of using something like this:


 class CassandraStorage(conf:SparkConf) {
   val session = CassandraConnector(conf).openSession()
   def store (t:Thingy) : Unit = {
 //session.execute cql goes here
   }
 }


 Is this a good approach? Do I need to worry about closing the session?
 Where / how best would I do that? Any pointers are appreciated.


 Thanks,

 Ashic.




RE: Spark Cassandra Connector proper usage

2014-10-23 Thread Ashic Mahtab
Hi Gerard,
I've gone with option 1, and seems to be working well. Option 2 is also quite 
interesting. Thanks for your help in this.

Regards,
Ashic.

From: gerard.m...@gmail.com
Date: Thu, 23 Oct 2014 17:07:56 +0200
Subject: Re: Spark Cassandra Connector proper usage
To: as...@live.com
CC: user@spark.apache.org

Hi Ashic,
At the moment I see two options: 
1) You could  use the CassandraConnector object to execute your specialized 
query. The recommended pattern is to to that within a rdd.foreachPartition(...) 
in order to amortize DB connection setup over the number of elements in on 
partition.   Something like this:
val sparkContext = ???val cassandraConnector = CassandraConnector(conf) val 
dataRdd = ??? // I assume this is the source of dataval rddThingById = 
dataRdd.map(elem = transformToIdByThing(elem) ) 
rddThingById.foreachPartition(partition = {
cassandraConnector.withSessionDo{ session =   partition.foreach(record = 
session.execute(update foo set things = things + ? where id=? , record.id, 
record.thing)   } }
2) You could change your datamodel slightly in order to avoid the update 
operation.
Actually, the cassandra representation of a set is nothing more than a column 
- timestamp, where the column name is an element of the set.So Set (a,b,c) = 
Column(a)- ts, Column(b) - ts, Column(c) - tx
So, if you desugarize your datamodel, you could use something like:create table 
foo (   id text primary key,   bar int,   things text,   ts timestamp,   
primary key ((id, bar), things))
And your Spark process would be reduced to:val sparkContext = ??? val dataRdd = 
??? // I assume this is the source of datadataRdd.map(elem = 
transformToIdBarThingByTimeStamp(elem) ).saveToCassandra(ks, foo,Columns(id, 
bar, thing, ts))

Hope this helps.
-kr, Gerard.

 



 
 
On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab as...@live.com wrote:



Hi Gerard,
Thanks for the response. Here's the scenario:

The target cassandra schema looks like this:

create table foo (
   id text primary key,
   bar int,
   things settext
)

The source in question is a Sql Server source providing the necessary data. The 
source goes over the same id multiple times adding things to the things set 
each time. With inserts, it'll replace things with a new set of one element, 
instead of appending that item. As such, the query

update foo set things = things + ? where id=?

solves the problem. If I had to stick with saveToCassasndra, I'd have to read 
in the existing row for each row, and then write it back. Since this is 
happening in parallel on multiple machines, that would likely cause 
discrepancies where a node will read and update to older values. Hence my 
question about session management in order to issue custom update queries.

Thanks,
Ashic.

Date: Thu, 23 Oct 2014 14:27:47 +0200
Subject: Re: Spark Cassandra Connector proper usage
From: gerard.m...@gmail.com
To: as...@live.com

Ashic,

With the Spark-cassandra connector you would typically create an RDD from the 
source table, update what you need,  filter out what you don't update and write 
it back to Cassandra.
Kr, Gerard
On Oct 23, 2014 1:21 PM, Ashic Mahtab as...@live.com wrote:



I'm looking to use spark for some ETL, which will mostly consist of update 
statements (a column is a set, that'll be appended to, so a simple insert is 
likely not going to work). As such, it seems like issuing CQL queries to import 
the data is the best option. Using the Spark Cassandra Connector, I see I can 
do this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
Now I don't want to open a session and close it for every row in the source (am 
I right in not wanting this? Usually, I have one session for the entire 
process, and keep using that in normal apps). However, it says that the 
connector is serializable, but the session is obviously not. So, wrapping the 
whole import inside a single withSessionDo seems like it'll cause problems. I 
was thinking of using something like this:
class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
//session.execute cql goes here
  }
}

Is this a good approach? Do I need to worry about closing the session? Where / 
how best would I do that? Any pointers are appreciated.
Thanks,Ashic.
  
  

  

Re: what's the best way to initialize an executor?

2014-10-23 Thread Sean Owen
It sounds like your code already does its initialization at most once
per JVM, and that's about as good as it gets. Each partition asks for
init in a thread-safe way and the first request succeeds.

On Thu, Oct 23, 2014 at 1:41 PM, Darin McBeath
ddmcbe...@yahoo.com.invalid wrote:
 I have some code that I only need to be executed once per executor in my
 spark application.  My current approach is to do something like the
 following:

 scala xmlKeyPair.foreachPartition(i = XPathProcessor.init(ats,
 Namespaces/NamespaceContext))

 So, If I understand correctly, the XPathProcessor.init will be called once
 per partition.  Since I have 48 partitions for this RDD and 2 million
 documents, that seems acceptable.  The downside is that I likely will have
 fewer executors than 48 (each executor will handle more than 1 partition) so
 the executor would be called more than once with XPathProcessor.init.  I
 have code in place to make sure this is not an issue.  But, I was wondering
 if there is a better way to accomplish something like this.

 Thanks.

 Darin.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set hadoop native library path in spark-1.1

2014-10-23 Thread Christophe Préaud
Hi,

Try the --driver-library-path option of spark-submit, e.g.:

/opt/spark/bin/spark-submit --driver-library-path /opt/hadoop/lib/native (...)

Regards,
Christophe.

On 21/10/2014 20:44, Pradeep Ch wrote:
 Hi all,

 Can anyone tell me how to set the native library path in Spark.

 Right not I am setting it using SPARK_LIBRARY_PATH environmental variable 
 in spark-env.sh. But still no success.

 I am still seeing this in spark-shell.

 NativeCodeLoader: Unable to load native-hadoop library for your platform... 
 using builtin-java classes where applicable


 Thanks,
 Pradeep


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multitenancy in Spark - within/across spark context

2014-10-23 Thread Marcelo Vanzin
You may want to take a look at https://issues.apache.org/jira/browse/SPARK-3174.

On Thu, Oct 23, 2014 at 2:56 AM, Jianshi Huang jianshi.hu...@gmail.com wrote:
 Upvote for the multitanency requirement.

 I'm also building a data analytic platform and there'll be multiple users
 running queries and computations simultaneously. One of the paint point is
 control of resource size. Users don't really know how much nodes they need,
 they always use as much as possible... The result is lots of wasted resource
 in our Yarn cluster.

 A way to 1) allow multiple spark context to share the same resource or 2)
 add dynamic resource management for Yarn mode is very much wanted.

 Jianshi

 On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar
 ashwinshanka...@gmail.com wrote:
  That's not something you might want to do usually. In general, a
  SparkContext maps to a user application
 
  My question was basically this. In this page in the official doc, under
  Scheduling within an application section, it talks about multiuser and
  fair sharing within an app. How does multiuser within an application
  work(how users connect to an app,run their stuff) ? When would I want to
  use
  this ?

 I see. The way I read that page is that Spark supports all those
 scheduling options; but Spark doesn't give you the means to actually
 be able to submit jobs from different users to a running SparkContext
 hosted on a different process. For that, you'll need something like
 the job server that I referenced before, or write your own framework
 for supporting that.

 Personally, I'd use the information on that page when dealing with
 concurrent jobs in the same SparkContext, but still restricted to the
 same user. I'd avoid trying to create any application where a single
 SparkContext is trying to be shared by multiple users in any way.

  As far as I understand, this will cause executors to be killed, which
  means that Spark will start retrying tasks to rebuild the data that
  was held by those executors when needed.
 
  I basically wanted to find out if there were any gotchas related to
  preemption on Spark. Things like say half of an application's executors
  got
  preempted say while doing reduceByKey, will the application progress
  with
  the remaining resources/fair share ?

 Jobs should still make progress as long as at least one executor is
 available. The gotcha would be the one I mentioned, where Spark will
 fail your job after x executors failed, which might be a common
 occurrence when preemption is enabled. That being said, it's a
 configurable option, so you can set x to a very large value and your
 job should keep on chugging along.

 The options you'd want to take a look at are: spark.task.maxFailures
 and spark.yarn.max.executor.failures

 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting only master heap

2014-10-23 Thread Andrew Or
Yeah, as Sameer commented, there is unfortunately not an equivalent
`SPARK_MASTER_MEMORY` that you can set. You can work around this by
starting the master and the slaves separately with different settings of
SPARK_DAEMON_MEMORY each time.

AFAIK there haven't been any major changes in the standalone master in
1.1.0, so I don't see an immediate explanation for what you're observing.
In general the Spark master doesn't use that much memory, and even if there
are many applications it will discard the old ones appropriately, so unless
you have a ton (like thousands) of concurrently running applications
connecting to it there's little likelihood for it to OOM. At least that's
my understanding.

-Andrew

2014-10-22 15:51 GMT-07:00 Sameer Farooqui same...@databricks.com:

 Hi Keith,

 Would be helpful if you could post the error message.

 Are you running Spark in Standalone mode or with YARN?

 In general, the Spark Master is only used for scheduling and it should be
 fine with the default setting of 512 MB RAM.

 Is it actually the Spark Driver's memory that you intended to change?



 *++ If in Standalone mode ++*
 You're right that SPARK_DAEMON_MEMORY set the memory to allocate to the
 Spark Master, Worker and even HistoryServer daemons together.

 SPARK_WORKER_MEMORY is slightly confusing. In Standalone mode, it is the
 amount of memory that a worker advertises as available for drivers to
 launch executors. The sum of the memory used by executors spawned from a
 worker cannot exceed SPARK_WORKER_MEMORY.

 Unfortunately, I'm not aware of a way to set the memory for Master and
 Worker individually, other than launching them manually. You can also try
 setting the config differently on each machine's spark-env.sh file.


 *++ If in YARN mode ++*
 In YARN, there is no setting for SPARK_DAEMON_MEMORY. Therefore this is
 only in the Standalone documentation.

 Remember that in YARN mode there is no Spark Worker, instead the YARN
 NodeManagers launches the Executors. And in YARN, there is no need to run a
 Spark Master JVM (since the YARN ResourceManager takes care of the
 scheduling).

 So, with YARN use SPARK_EXECUTOR_MEMORY to set the Executor's memory. And
 use SPARK_DRIVER_MEMORY to set the Driver's memory.

 Just an FYI - for compatibility's sake, even in YARN mode there is a
 setting for SPARK_WORKER_MEMORY, but this has been deprecated. If you do
 set it, it just does the same thing as setting SPARK_EXECUTOR_MEMORY would
 have done.


 - Sameer


 On Wed, Oct 22, 2014 at 1:46 PM, Keith Simmons ke...@pulse.io wrote:

 We've been getting some OOMs from the spark master since upgrading to
 Spark 1.1.0.  I've found SPARK_DAEMON_MEMORY, but that also seems to
 increase the worker heap, which as far as I know is fine.  Is there any
 setting which *only* increases the master heap size?

 Keith





Re: Shuffle issues in the current master

2014-10-23 Thread Andrew Or
To add to Aaron's response, `spark.shuffle.consolidateFiles` only applies
to hash-based shuffle, so you shouldn't have to set it for sort-based
shuffle. And yes, since you changed neither `spark.shuffle.compress` nor
`spark.shuffle.spill.compress` you can't possibly have run into what #2890
fixes.

I'm assuming you're running master? Was it before or after this commit:
https://github.com/apache/spark/commit/6b79bfb42580b6bd4c4cd99fb521534a94150693
?

-Andrew

2014-10-22 16:37 GMT-07:00 Aaron Davidson ilike...@gmail.com:

 You may be running into this issue:
 https://issues.apache.org/jira/browse/SPARK-4019

 You could check by having 2000 or fewer reduce partitions.

 On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai dbt...@dbtsai.com wrote:

 PS, sorry for spamming the mailing list. Based my knowledge, both
 spark.shuffle.spill.compress and spark.shuffle.compress are default to
 true, so in theory, we should not run into this issue if we don't
 change any setting. Is there any other big we run into?

 Thanks.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Oct 22, 2014 at 1:37 PM, DB Tsai dbt...@dbtsai.com wrote:
  Or can it be solved by setting both of the following setting into true
 for now?
 
  spark.shuffle.spill.compress true
  spark.shuffle.compress ture
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote:
  It seems that this issue should be addressed by
  https://github.com/apache/spark/pull/2890 ? Am I right?
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote:
  Hi all,
 
  With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but
  I've another exception now. I've no clue about what's going on; does
  anyone run into similar issue? Thanks.
 
  This is the configuration I use.
  spark.rdd.compress true
  spark.shuffle.consolidateFiles true
  spark.shuffle.manager SORT
  spark.akka.frameSize 128
  spark.akka.timeout  600
  spark.core.connection.ack.wait.timeout  600
  spark.core.connection.auth.wait.timeout 300
 
 
 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325)
 
  
 java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
 
  java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
  java.io.ObjectInputStream.init(ObjectInputStream.java:299)
 
  
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57)
 
  
 org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57)
 
  
 org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95)
 
  
 org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
  
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 
  
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 
  org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
 
  
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  org.apache.spark.scheduler.Task.run(Task.scala:56)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
 
  
 

Re: How to access objects declared and initialized outside the call() method of JavaRDD

2014-10-23 Thread Localhost shell
Bang On Sean

Before sending the issue mail, I was able to remove the compilation error
by making it final but then got the
Caused by: java.io.NotSerializableException:
org.apache.spark.api.java.JavaSparkContext   (As you mentioned)

Now regarding your suggestion of changing the business logic,
1. *Is the current approach possible if I write the code in Scala ?* I
think probably not but wanted to check with you.

2. Brief steps of what the code is doing:

  1. Get raw sessions data from datatsore (C*)
2. Process the raw sessions data
3. Iterate over the processed data(derive from #2) and fetch the
previously aggregated data from store for those rowkeys
   Add the values from this batch to previous batch values
4. Save back the updated values

   * This github gist might explain you more
https://gist.github.com/rssvihla/6577359860858ccb0b33
https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a
similar thing in scala.*
I am trying to achieve a similar thing in Java using Spark Batch with
C* as the datastore.

I have attached the java code file to provide you some code details. (If I
was not able to explain you the problem so the code will be handy)


The reason why I am fetching only selective data (that I will update later)
because Cassanbdra doesn't provide range queries so I thought fetching
complete data might be expensive.

It will be great if you can share ur thoughts.

On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote:

 In Java, javaSparkContext would have to be declared final in order for
 it to be accessed inside an inner class like this. But this would
 still not work as the context is not serializable. You  should rewrite
 this so you are not attempting to use the Spark context inside  an
 RDD.

 On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell
 universal.localh...@gmail.com wrote:
  Hey All,
 
  I am unable to access objects declared and initialized outside the call()
  method of JavaRDD.
 
  In the below code snippet, call() method makes a fetch call to C* but
 since
  javaSparkContext is defined outside the call method scope so compiler
 give a
  compilation error.
 
  stringRdd.foreach(new VoidFunctionString() {
  @Override
  public void call(String str) throws Exception {
  JavaRDDString vals =
  javaFunctions(javaSparkContext).cassandraTable(schema, table,
  String.class)
  .select(val);
  }
  });
 
  In other languages I have used closure to do this but not able to achieve
  the same here.
 
  Can someone suggest how to achieve this in the current code context?
 
 
  --Unilocal
 
 
 




-- 
--Unilocal


PocAppNew.java
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How to access objects declared and initialized outside the call() method of JavaRDD

2014-10-23 Thread Jayant Shekhar
+1 to Sean.

Is it possible to rewrite your code to not use SparkContext in RDD. Or why
does javaFunctions() need the SparkContext.

On Thu, Oct 23, 2014 at 10:53 AM, Localhost shell 
universal.localh...@gmail.com wrote:

 Bang On Sean

 Before sending the issue mail, I was able to remove the compilation error
 by making it final but then got the
 Caused by: java.io.NotSerializableException:
 org.apache.spark.api.java.JavaSparkContext   (As you mentioned)

 Now regarding your suggestion of changing the business logic,
 1. *Is the current approach possible if I write the code in Scala ?* I
 think probably not but wanted to check with you.

 2. Brief steps of what the code is doing:

   1. Get raw sessions data from datatsore (C*)
 2. Process the raw sessions data
 3. Iterate over the processed data(derive from #2) and fetch the
 previously aggregated data from store for those rowkeys
Add the values from this batch to previous batch values
 4. Save back the updated values

* This github gist might explain you more
 https://gist.github.com/rssvihla/6577359860858ccb0b33
 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a
 similar thing in scala.*
 I am trying to achieve a similar thing in Java using Spark Batch with
 C* as the datastore.

 I have attached the java code file to provide you some code details. (If I
 was not able to explain you the problem so the code will be handy)


 The reason why I am fetching only selective data (that I will update
 later) because Cassanbdra doesn't provide range queries so I thought
 fetching complete data might be expensive.

 It will be great if you can share ur thoughts.

 On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote:

 In Java, javaSparkContext would have to be declared final in order for
 it to be accessed inside an inner class like this. But this would
 still not work as the context is not serializable. You  should rewrite
 this so you are not attempting to use the Spark context inside  an
 RDD.

 On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell
 universal.localh...@gmail.com wrote:
  Hey All,
 
  I am unable to access objects declared and initialized outside the
 call()
  method of JavaRDD.
 
  In the below code snippet, call() method makes a fetch call to C* but
 since
  javaSparkContext is defined outside the call method scope so compiler
 give a
  compilation error.
 
  stringRdd.foreach(new VoidFunctionString() {
  @Override
  public void call(String str) throws Exception {
  JavaRDDString vals =
  javaFunctions(javaSparkContext).cassandraTable(schema, table,
  String.class)
  .select(val);
  }
  });
 
  In other languages I have used closure to do this but not able to
 achieve
  the same here.
 
  Can someone suggest how to achieve this in the current code context?
 
 
  --Unilocal
 
 
 




 --
 --Unilocal


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Aggregation Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:

2014-10-23 Thread arthur.hk.c...@gmail.com
HI,


My step to create LINEITEM:
$HADOOP_HOME/bin/hadoop fs -mkdir /tpch/lineitem
$HADOOP_HOME/bin/hadoop fs -copyFromLocal lineitem.tbl /tpch/lineitem/

Create external table lineitem (L_ORDERKEY INT, L_PARTKEY INT, L_SUPPKEY INT, 
L_LINENUMBER INT, L_QUANTITY DOUBLE, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE, 
L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING, 
L_COMMITDATE STRING, L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE 
STRING, L_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED 
AS TEXTFILE LOCATION '/tpch/lineitem’;

Regards
Arthur


On 23 Oct, 2014, at 9:36 pm, Yin Huai huaiyin@gmail.com wrote:

 Hello Arthur,
 
 You can use do aggregations in SQL. How did you create LINEITEM?
 
 Thanks,
 
 Yin
 
 On Thu, Oct 23, 2014 at 8:54 AM, arthur.hk.c...@gmail.com 
 arthur.hk.c...@gmail.com wrote:
 Hi,
 
 I got $TreeNodeException, few questions:
 Q1) How should I do aggregation in SparK? Can I use aggregation directly in 
 SQL? or
 Q1) Should I use SQL to load the data to form RDD then use scala to do the 
 aggregation?
 
 Regards
 Arthur
 
 
 MySQL (good one, without aggregation): 
 sqlContext.sql(SELECT L_RETURNFLAG FROM LINEITEM WHERE  
 L_SHIPDATE='1998-09-02'  GROUP  BY L_RETURNFLAG, L_LINESTATUS ORDER  BY 
 L_RETURNFLAG, L_LINESTATUS).collect().foreach(println);
 [A]
 [N]
 [N]
 [R]
 
 
 My SQL (problem SQL, with aggregation):
 sqlContext.sql(SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS 
 SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - 
 L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 
 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) 
 AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER  FROM 
 LINEITEM WHERE  L_SHIPDATE='1998-09-02'  GROUP  BY L_RETURNFLAG, 
 L_LINESTATUS ORDER  BY L_RETURNFLAG, 
 L_LINESTATUS).collect().foreach(println);
 
 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks 
 have all completed, from pool 
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
 Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true
  Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200)
   Aggregate false, [l_returnflag#200,l_linestatus#201], 
 [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS 
 sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS 
 sum_disc_price#183,SUM(PartialSum#219) AS 
 sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / 
 CAST(SUM(PartialCount#221L), DoubleType)) AS 
 avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / 
 CAST(SUM(PartialCount#223L), DoubleType)) AS 
 avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / 
 CAST(SUM(PartialCount#225L), DoubleType)) AS 
 avg_disc#187,SUM(PartialCount#226L) AS count_order#188L]
Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200)
 Aggregate true, [l_returnflag#200,l_linestatus#201], 
 [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS 
 PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS 
 PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS 
 PartialSum#218,SUM(l_extendedprice#194) AS 
 PartialSum#217,COUNT(l_quantity#193) AS PartialCount#221L,SUM(l_quantity#193) 
 AS PartialSum#220,COUNT(l_extendedprice#194) AS 
 PartialCount#223L,SUM(l_extendedprice#194) AS 
 PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + 
 l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216]
  Project 
 [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193]
   Filter (l_shipdate#197 = 1998-09-02)
HiveTableScan 
 [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193],
  (MetastoreRelation boc_12, lineitem, None), None
 
   at 
 org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
   at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
   at 
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
   at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
   at $iwC$$iwC$$iwC$$iwC.init(console:15)
   at $iwC$$iwC$$iwC.init(console:20)
   at $iwC$$iwC.init(console:22)
   at $iwC.init(console:24)
   at init(console:26)
   at .init(console:30)
   at .clinit(console)
   at .init(console:7)
   at .clinit(console)
   at $print(console)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
   at 
 

Re: How to access objects declared and initialized outside the call() method of JavaRDD

2014-10-23 Thread Localhost shell
Hey Jayant,

In my previous mail, I have mentioned a github gist
*https://gist.github.com/rssvihla/6577359860858ccb0b33
https://gist.github.com/rssvihla/6577359860858ccb0b33 *which is doing
very similar to what I want to do but its using scala language for spark.

Hence my question (reiterating from previous mail):
*Is the current approach possible if I write the code in Scala?*

Why does javaFunctions() need the SparkContext?
Because per row in the RDD, I am making a get call to the data store
'cassandra'. The reason why I am fetching only selective data (that I will
update later) because Cassandra doesn't provide range queries so I thought
fetching complete data might be expensive.



On Thu, Oct 23, 2014 at 11:22 AM, Jayant Shekhar jay...@cloudera.com
wrote:

 +1 to Sean.

 Is it possible to rewrite your code to not use SparkContext in RDD. Or why
 does javaFunctions() need the SparkContext.

 On Thu, Oct 23, 2014 at 10:53 AM, Localhost shell 
 universal.localh...@gmail.com wrote:

 Bang On Sean

 Before sending the issue mail, I was able to remove the compilation error
 by making it final but then got the
 Caused by: java.io.NotSerializableException:
 org.apache.spark.api.java.JavaSparkContext   (As you mentioned)

 Now regarding your suggestion of changing the business logic,
 1. *Is the current approach possible if I write the code in Scala ?* I
 think probably not but wanted to check with you.

 2. Brief steps of what the code is doing:

   1. Get raw sessions data from datatsore (C*)
 2. Process the raw sessions data
 3. Iterate over the processed data(derive from #2) and fetch the
 previously aggregated data from store for those rowkeys
Add the values from this batch to previous batch values
 4. Save back the updated values

* This github gist might explain you more
 https://gist.github.com/rssvihla/6577359860858ccb0b33
 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a
 similar thing in scala.*
 I am trying to achieve a similar thing in Java using Spark Batch with
 C* as the datastore.

 I have attached the java code file to provide you some code details. (If
 I was not able to explain you the problem so the code will be handy)


 The reason why I am fetching only selective data (that I will update
 later) because Cassanbdra doesn't provide range queries so I thought
 fetching complete data might be expensive.

 It will be great if you can share ur thoughts.

 On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote:

 In Java, javaSparkContext would have to be declared final in order for
 it to be accessed inside an inner class like this. But this would
 still not work as the context is not serializable. You  should rewrite
 this so you are not attempting to use the Spark context inside  an
 RDD.

 On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell
 universal.localh...@gmail.com wrote:
  Hey All,
 
  I am unable to access objects declared and initialized outside the
 call()
  method of JavaRDD.
 
  In the below code snippet, call() method makes a fetch call to C* but
 since
  javaSparkContext is defined outside the call method scope so compiler
 give a
  compilation error.
 
  stringRdd.foreach(new VoidFunctionString() {
  @Override
  public void call(String str) throws Exception {
  JavaRDDString vals =
  javaFunctions(javaSparkContext).cassandraTable(schema, table,
  String.class)
  .select(val);
  }
  });
 
  In other languages I have used closure to do this but not able to
 achieve
  the same here.
 
  Can someone suggest how to achieve this in the current code context?
 
 
  --Unilocal
 
 
 




 --
 --Unilocal


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
--Unilocal


Re: Transforming the Dstream vs transforming each RDDs in the Dstream.

2014-10-23 Thread Tathagata Das
Hey Gerard,

This is a very good question!

*TL;DR: *The performance should be same, except in case of shuffle-based
operations where the number of reducers is not explicitly specified.

Let me answer in more detail by dividing the set of DStream operations into
three categories.

*1. Map-like operations (map, flatmap, filter, etc.) that does not involve
any shuffling of data:* Performance should virtually be the same in both
cases. Either ways, in each batch, the operations on the batch's RDD are
first set on the driver, and then the actions like on the RDD are executed.
There are very very minor differences in the two cases of early foreachRDD
and late foreachRDD (e.x, cleaning up for function closures, etc.) but
those should make almost not difference in the performance.

*2. Operations involving shuffle: *Here is there is a subtle difference in
both cases if the number of partitions is not specified. The default number
of partitions used when using dstream.reduceByKey() and than when using
dstream.foreachRDD(_.reduceByKey()) are different, and one needs to play
around with the number of reducers to see what performs better. But if the
number of reducers is explicitly specified and is the same both cases, then
the performance should be similar. Note that this difference in the default
numbers are not guaranteed to be like this, it could change in future
implementations.

*3. Aggregation-like operations (count, reduce): *Here there is another
subtle execution difference between
- dstream.count() which produces a DStream of single-element RDDs, the
element being the count, and
- dstream.foreachRDD(_.count()) which returns the count directly.

In the first case, some random worker node is chosen for the reduce, in
another the driver is chosen for the reduce. There should not be a
significant performance difference.

*4. Other operations* including window ops and stateful ops
(updateStateByKey), are obviously not part of the discussion as they cannot
be (easily) done through early foreachRDD.

Hope this helps!

TD

PS: Sorry for not noticing this question earlier.

On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas gerard.m...@gmail.com wrote:

 PS: Just to clarify my statement:

 Unlike the feared RDD operations on the driver, it's my understanding
 that these Dstream ops on the driver are merely creating an execution plan
 for each RDD.

 With feared RDD operations on the driver I meant to contrast an rdd
 action like rdd.collect that would pull all rdd data to the driver, with
 dstream.foreachRDD(rdd = rdd.op) for which documentation says 'it runs on
 the driver' yet, all that it looks to be running on the driver is the
 scheduling of 'op' on that rdd, just like it happens for all rdd other
 operations
 (thanks to Sean for the clarification)

 So, not to move focus away from the original question:

 In Spark Streaming, would it be better to do foreachRDD early in a
 pipeline or instead do as much Dstream transformations before going into
 the foreachRDD call?

 Between these two pieces of code, from a performance perspective, what
 would be preferred and why:

 - Early foreachRDD:

 dstream.foreachRDD(rdd =
 val records = rdd.map(elem = record(elem))
 targets.foreach(target = records.filter{record =
 isTarget(target,record)}.writeToCassandra(target,table))
 )

 - As most dstream transformations as possible before foreachRDD:

 val recordStream = dstream.map(elem = record(elem))
 targets.foreach{target = recordStream.filter(record =
 isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}

 ?

 kr, Gerard.



 On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Thanks Matt,

 Unlike the feared RDD operations on the driver, it's my understanding
 that these Dstream ops on the driver are merely creating an execution plan
 for each RDD.
 My question still remains: Is it better to foreachRDD early in the
 process or do as much Dstream transformations before going into the
 foreachRDD call?

 Maybe this will require some empirical testing specific to each
 implementation?

 -kr, Gerard.


 On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com
 wrote:

 http://spark.apache.org/docs/latest/streaming-programming-guide.html

 foreachRDD is executed on the driver….

 mn

 On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Pinging TD  -- I'm sure you know :-)

 -kr, Gerard.

 On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 We have been implementing several Spark Streaming jobs that are
 basically processing data and inserting it into Cassandra, sorting it among
 different keyspaces.

 We've been following the pattern:

 dstream.foreachRDD(rdd =
 val records = rdd.map(elem = record(elem))
 targets.foreach(target = records.filter{record =
 isTarget(target,record)}.writeToCassandra(target,table))
 )

 I've been wondering whether there would be a performance difference in
 transforming the dstream 

Re: Spark Streaming Applications

2014-10-23 Thread Tathagata Das
Cc'ing Helena for more information on this.

TD

On Thu, Oct 23, 2014 at 6:30 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 What is the application about? I couldn't find any proper description
 regarding the purpose of killrweather ( I mean, other than just integrating
 Spark with Cassandra). Do you know if the slides of that tutorial are
 available somewhere?

 Thanks!

 On Wed, Oct 22, 2014 at 6:58 PM, Sameer Farooqui same...@databricks.com
 wrote:

 Hi Saiph,

 Patrick McFadin and Helena Edelson from DataStax taught a tutorial at NYC
 Strata last week where they created a prototype Spark Streaming + Kafka
 application for time series data.

 You can see the code here:
 https://github.com/killrweather/killrweather


 On Tue, Oct 21, 2014 at 4:33 PM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I have been trying to find a fairly complex application that makes use
 of the Spark Streaming framework. I checked public github repos but the
 examples I found were too simple, only comprising simple operations like
 counters and sums. On the Spark summit website, I could find very
 interesting projects, however no source code was available.

 Where can I find non-trivial spark streaming application code? Is it
 that difficult?

 Thanks.






JavaHiveContext class not found error. Help!!

2014-10-23 Thread nitinkak001
I am trying to run the below Hive query on Yarn. I am using Cloudera 5.1.
What can I do to make this work?

/SELECT * FROM table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY
IP_ADDRESS, COOKIE_ID;/

Below is the stack trace:

Exception in thread Thread-4 java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:187)
Caused by: *java.lang.NoClassDefFoundError:
org/apache/spark/sql/hive/api/java/JavaHiveContext*
at HiveContextExample.main(HiveContextExample.java:57)
... 5 more
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.sql.hive.api.java.JavaHiveContext
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more


Here is the code invoking it:

/SparkConf conf = new SparkConf().setAppName(PartitionData);

JavaSparkContext ctx = new JavaSparkContext(conf);

JavaHiveContext hiveContext = new JavaHiveContext(ctx);

String sql = SELECT * FROM table_name DISTRIBUTE BY GEO_REGION,
GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID;

JavaSchemaRDD partitionedRDD = hiveContext.sql(sql);/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JavaHiveContext-class-not-found-error-Help-tp17149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to access objects declared and initialized outside the call() method of JavaRDD

2014-10-23 Thread lordjoe
 What I have been doing is building a JavaSparkContext the first time it is
needed and keeping it as a ThreadLocal - All my code uses
SparkUtilities.getCurrentContext(). On a Slave machine you build a new
context and don't have to serialize it
The code is in a large project at
https://code.google.com/p/distributed-tools/ - a work in progress but the 
Spark aficionados on this list will say if the approach is Kosher

public class SparkUtilities extends Serializable
private transient static ThreadLocalJavaSparkContext threadContext;
private static String appName = Anonymous;
 
   public static String getAppName() {
return appName;
}

public static void setAppName(final String pAppName) {
appName = pAppName;
}

/**
 * create a JavaSparkContext for the thread if none exists
 *
 * @return
 */
public static synchronized JavaSparkContext getCurrentContext() {
if (threadContext == null)
threadContext = new ThreadLocalJavaSparkContext();
JavaSparkContext ret = threadContext.get();
if (ret != null)
return ret;
SparkConf sparkConf = new SparkConf().setAppName(getAppName());

//   Here do operations you would do to initialize a context
ret = new JavaSparkContext(sparkConf);

threadContext.set(ret);
return ret;
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-tp17094p17150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JavaHiveContext class not found error. Help!!

2014-10-23 Thread Marcelo Vanzin
Hello there,

This is more of a question for the cdh-users list, but in any case...
In CDH 5.1 we skipped packaging of the Hive module in SparkSQL. That
has been fixed in CDH 5.2, so if it's possible for you I'd recommend
upgrading.

On Thu, Oct 23, 2014 at 2:53 PM, nitinkak001 nitinkak...@gmail.com wrote:
 I am trying to run the below Hive query on Yarn. I am using Cloudera 5.1.
 What can I do to make this work?

 /SELECT * FROM table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY
 IP_ADDRESS, COOKIE_ID;/

 Below is the stack trace:

 Exception in thread Thread-4 java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:187)
 Caused by: *java.lang.NoClassDefFoundError:
 org/apache/spark/sql/hive/api/java/JavaHiveContext*
 at HiveContextExample.main(HiveContextExample.java:57)
 ... 5 more
 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.sql.hive.api.java.JavaHiveContext
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 6 more


 Here is the code invoking it:

 /SparkConf conf = new SparkConf().setAppName(PartitionData);

 JavaSparkContext ctx = new JavaSparkContext(conf);

 JavaHiveContext hiveContext = new JavaHiveContext(ctx);

 String sql = SELECT * FROM table_name DISTRIBUTE BY 
 GEO_REGION,
 GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID;

 JavaSchemaRDD partitionedRDD = hiveContext.sql(sql);/



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/JavaHiveContext-class-not-found-error-Help-tp17149.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark is running extremely slow with larger data set, like 2G

2014-10-23 Thread xuhongnever
my spark version is 1.1.0 pre-build with hadoop 1.x
my code is implemented in python trying to covert a graph data set in edge
list to adjacency list

spark is running in standalone mode

It runs well with a small data set like soc-liveJournal1, about 1G

Then I run it on 25G twitter graph, one task won't finish in hours

I tried the input both from NFS and HDFS
http://apache-spark-user-list.1001560.n3.nabble.com/file/n17152/48.png 

What's could be the problem? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark is running extremely slow with larger data set, like 2G

2014-10-23 Thread xuhongnever
my code is here:

from pyspark import SparkConf, SparkContext

def Undirect(edge):
vector = edge.strip().split('\t')
if(vector[0].isdigit()):
return [(vector[0], vector[1])]
return []


conf = SparkConf()
conf.setMaster(spark://compute-0-14:7077)
conf.setAppName(adjacencylist)
conf.set(spark.executor.memory, 1g)

sc = SparkContext(conf = conf)

file = sc.textFile(file:///home/xzhang/data/soc-LiveJournal1.txt)
records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b:
a + \t + b )
#print(records.count())
#records = records.sortByKey()
records = records.map(lambda line: line[0] + \t + line[1])
records.saveAsTextFile(file:///home/xzhang/data/result)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.1.0 and Hive 0.12.0 Compatibility Issue

2014-10-23 Thread Arthur . hk . chan
Hi

My Spark is 1.1.0 and Hive is 0.12,  I tried to run the same query in both 
Hive-0.12.0 then Spark-1.1.0,  HiveQL works while SparkSQL failed. 


hive select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, 
o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 
'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = 
o.o_orderkey where o_orderdate  '1995-03-15' and l_shipdate  '1995-03-15' 
group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, 
o_orderdate limit 10;
Ended Job = job_1414067367860_0011
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 2.0 sec   HDFS Read: 261 HDFS Write: 
96 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 0.88 sec   HDFS Read: 458 HDFS 
Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 880 msec
OK
Time taken: 38.771 seconds


scala sqlContext.sql(select l_orderkey, sum(l_extendedprice*(1-l_discount)) 
as revenue, o_orderdate, o_shippriority from customer c join orders o on 
c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on 
l.l_orderkey = o.o_orderkey where o_orderdate  '1995-03-15' and l_shipdate  
'1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue 
desc, o_orderdate limit 10).collect().foreach(println);
org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in 
stage 5.0 failed 4 times, most recent failure: Lost task 14.3 in stage 5.0 (TID 
568, m34): java.lang.ClassCastException: java.lang.String cannot be cast to 
scala.math.BigDecimal
scala.math.Numeric$BigDecimalIsFractional$.minus(Numeric.scala:182)

org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64)

org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64)

org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)

org.apache.spark.sql.catalyst.expressions.Subtract.eval(arithmetic.scala:64)

org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)

org.apache.spark.sql.catalyst.expressions.Multiply.eval(arithmetic.scala:70)

org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:47)

org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)

org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69)

org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at 

Spark 1.1.0 and Hive 0.12.0 Compatibility Issue

2014-10-23 Thread arthur.hk.c...@gmail.com
(Please ignore if duplicated)


Hi,

My Spark is 1.1.0 and Hive is 0.12,  I tried to run the same query in both 
Hive-0.12.0 then Spark-1.1.0,  HiveQL works while SparkSQL failed. 

hive select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, 
o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 
'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = 
o.o_orderkey where o_orderdate  '1995-03-15' and l_shipdate  '1995-03-15' 
group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, 
o_orderdate limit 10;
Ended Job = job_1414067367860_0011
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 2.0 sec   HDFS Read: 261 HDFS Write: 
96 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 0.88 sec   HDFS Read: 458 HDFS 
Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 880 msec
OK
Time taken: 38.771 seconds


scala sqlContext.sql(select l_orderkey, sum(l_extendedprice*(1-l_discount)) 
as revenue, o_orderdate, o_shippriority from customer c join orders o on 
c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on 
l.l_orderkey = o.o_orderkey where o_orderdate  '1995-03-15' and l_shipdate  
'1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue 
desc, o_orderdate limit 10).collect().foreach(println);
org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in 
stage 5.0 failed 4 times, most recent failure: Lost task 14.3 in stage 5.0 (TID 
568, m34): java.lang.ClassCastException: java.lang.String cannot be cast to 
scala.math.BigDecimal
scala.math.Numeric$BigDecimalIsFractional$.minus(Numeric.scala:182)

org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64)

org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64)

org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)

org.apache.spark.sql.catalyst.expressions.Subtract.eval(arithmetic.scala:64)

org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)

org.apache.spark.sql.catalyst.expressions.Multiply.eval(arithmetic.scala:70)

org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:47)

org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)

org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69)

org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at 

Re: unable to make a custom class as a key in a pairrdd

2014-10-23 Thread Niklas Wilcke
Hi Jao,

I don't really know why this doesn't work but I have two hints.
You don't need to override hashCode and equals. The modifier case is
doing that for you. Writing

case class PersonID(id: String)

would be enough to get the class you want I think.
If I change the type of the id param to Int it works for me but I don't
know why.

case class PersonID(id: Int)

Looks like a strange behavior to me. Have a try.

Good luck,
Niklas

On 23.10.2014 21:52, Jaonary Rabarisoa wrote:
 Hi all,

 I have the following case class that I want to use as a key in a
 key-value rdd. I defined the equals and hashCode methode but it's not
 working. What I'm doing wrong ?

 /case class PersonID(id: String) {/
 / /
 / override def hashCode = id.hashCode/
 / /
 / override def equals(other: Any) = other match {/
 / /
 / case that: PersonID = this.id http://this.id == that.id
 http://that.id  this.getClass == that.getClass/
 / case _ = false/
 / }   /
 / }   /
 / /
 / /
 / val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))/
 /
 /
 /
 /
 /p.groupByKey.collect foreach println/
 /
 /
 /(PersonID(1),CompactBuffer(5))/
 /(PersonID(1),CompactBuffer(6))/
 /(PersonID(1),CompactBuffer(7))/
 /(PersonID(1),CompactBuffer(8, 9))/
 /(PersonID(1),CompactBuffer(1))/
 /(PersonID(1),CompactBuffer(2))/
 /(PersonID(1),CompactBuffer(3))/
 /(PersonID(1),CompactBuffer(4))/
 /
 /
 /
 /
 Best,

 Jao



Re: Exceptions not caught?

2014-10-23 Thread Ted Yu
Can you show the stack trace ?

Also, how do you catch exceptions ? Did you specify TProtocolException ?

Cheers

On Thu, Oct 23, 2014 at 3:40 PM, ankits ankitso...@gmail.com wrote:

 Hi, I'm running a spark job and encountering an exception related to
 thrift.
 I wanted to know where this is being thrown, but the stack trace is
 completely useless. So I started adding try catches, to the point where my
 whole main method that does everything is surrounded with a try catch. Even
 then, nothing is being caught. I still see this message though:

 2014-10-23 15:39:50,845  ERROR [] Exception in task 1.0 in stage 1.0 (TID
 1)
 java.io.IOException: org.apache.thrift.protocol.TProtocolException:
 .

 What is going on? Why isn't the exception just being handled by the
 try-catch? (BTW this is in Scala)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Exceptions not caught?

2014-10-23 Thread ankits
I am simply catching all exceptions (like case e:Throwable =
println(caught: +e) )

Here is the stack trace:

2014-10-23 15:51:10,766  ERROR [] Exception in task 1.0 in stage 1.0 (TID 1)
java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required
field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON,
response_time:2, now:1409094360, env_type:PROD,)
at com.A.thrift.Y.writeObject(Y.java:8489)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
at
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.thrift.protocol.TProtocolException: Required field 'X'
is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2,
now:1409094360, env_type:PROD, ...)
at com.A.thrift.Y.validate(Y:8428)
at com.A.thrift.Y$YStandardScheme.write(Y.java:9359)
at com.A.thrift.Y$FlatAdserverEventStandardScheme.write(Y.java:8509)
at com.A.thrift.Y.write(Y.java:7646)
at com.A.thrift.Y.writeObject(Y.java:8487)
... 27 more
2014-10-23 15:51:10,766 11234 ERROR [] Exception in task 0.0 in stage 1.0
(TID 0)
java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required
field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON,
response_time:2, now:1409094360, ...)
at com.A.thrift.YwriteObject(Y.java:8489)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
at
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
at 

Re: Spark 1.1.0 and Hive 0.12.0 Compatibility Issue

2014-10-23 Thread Michael Armbrust
Can you show the DDL for the table?  It looks like the SerDe might be
saying it will produce a decimal type but is actually producing a string.

On Thu, Oct 23, 2014 at 3:17 PM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

 Hi

 My Spark is 1.1.0 and Hive is 0.12,  I tried to run the same query in both
 Hive-0.12.0 then Spark-1.1.0,  HiveQL works while SparkSQL failed.


 hive select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue,
 o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment
 = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey
 = o.o_orderkey where o_orderdate  '1995-03-15' and l_shipdate 
 '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by
 revenue desc, o_orderdate limit 10;
 Ended Job = job_1414067367860_0011
 MapReduce Jobs Launched:
 Job 0: Map: 1  Reduce: 1   Cumulative CPU: 2.0 sec   HDFS Read: 261 HDFS
 Write: 96 SUCCESS
 Job 1: Map: 1  Reduce: 1   Cumulative CPU: 0.88 sec   HDFS Read: 458 HDFS
 Write: 0 SUCCESS
 Total MapReduce CPU Time Spent: 2 seconds 880 msec
 OK
 Time taken: 38.771 seconds


 scala sqlContext.sql(select l_orderkey,
 sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority
 from customer c join orders o on c.c_mktsegment = 'BUILDING' and
 c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey
 where o_orderdate  '1995-03-15' and l_shipdate  '1995-03-15' group by
 l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate
 limit 10).collect().foreach(println);
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 14
 in stage 5.0 failed 4 times, most recent failure: Lost task 14.3 in stage
 5.0 (TID 568, m34): java.lang.ClassCastException: java.lang.String cannot
 be cast to scala.math.BigDecimal
 scala.math.Numeric$BigDecimalIsFractional$.minus(Numeric.scala:182)

 org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64)

 org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64)

 org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)

 org.apache.spark.sql.catalyst.expressions.Subtract.eval(arithmetic.scala:64)

 org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)

 org.apache.spark.sql.catalyst.expressions.Multiply.eval(arithmetic.scala:70)

 org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:47)

 org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)

 org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)

 org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69)

 org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
 org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)

 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
 

Re: Exceptions not caught?

2014-10-23 Thread ankits
Also everything is running locally on my box, driver and workers.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157p17160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exceptions not caught?

2014-10-23 Thread Ted Yu
bq. Required field 'X' is unset! Struct:Y

Can you check your class Y and fix the above ?

Cheers

On Thu, Oct 23, 2014 at 3:55 PM, ankits ankitso...@gmail.com wrote:

 I am simply catching all exceptions (like case e:Throwable =
 println(caught: +e) )

 Here is the stack trace:

 2014-10-23 15:51:10,766  ERROR [] Exception in task 1.0 in stage 1.0 (TID
 1)
 java.io.IOException: org.apache.thrift.protocol.TProtocolException:
 Required
 field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON,
 response_time:2, now:1409094360, env_type:PROD,)
 at com.A.thrift.Y.writeObject(Y.java:8489)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
 at

 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
 at

 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at

 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.thrift.protocol.TProtocolException: Required field
 'X'
 is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2,
 now:1409094360, env_type:PROD, ...)
 at com.A.thrift.Y.validate(Y:8428)
 at com.A.thrift.Y$YStandardScheme.write(Y.java:9359)
 at
 com.A.thrift.Y$FlatAdserverEventStandardScheme.write(Y.java:8509)
 at com.A.thrift.Y.write(Y.java:7646)
 at com.A.thrift.Y.writeObject(Y.java:8487)
 ... 27 more
 2014-10-23 15:51:10,766 11234 ERROR [] Exception in task 0.0 in stage 1.0
 (TID 0)
 java.io.IOException: org.apache.thrift.protocol.TProtocolException:
 Required
 field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON,
 response_time:2, now:1409094360, ...)
 at com.A.thrift.YwriteObject(Y.java:8489)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 

Re: Exceptions not caught?

2014-10-23 Thread ankits
Can you check your class Y and fix the above ?

I can, but this is about catching the exception should it be thrown by any
class in the spark job. Why is the exception not being caught?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157p17163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exceptions not caught?

2014-10-23 Thread Marcelo Vanzin
On Thu, Oct 23, 2014 at 3:40 PM, ankits ankitso...@gmail.com wrote:
 2014-10-23 15:39:50,845  ERROR [] Exception in task 1.0 in stage 1.0 (TID 1)
 java.io.IOException: org.apache.thrift.protocol.TProtocolException:

This looks like an exception that's happening on an executor and just
being reported in the driver's logs, so there's nothing to catch in
the driver, which might explain why you're not catching anything.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: About Memory usage in the Spark UI

2014-10-23 Thread Tathagata Das
The memory usage of blocks of data received through Spark Streaming is not
reflected in the Spark UI. It only shows the memory usage due to cached
RDDs.
I didnt find a JIRA for this, so I opened a new one.

https://issues.apache.org/jira/browse/SPARK-4072


TD

On Thu, Oct 23, 2014 at 12:47 AM, Haopu Wang hw...@qilinsoft.com wrote:

  Patrick, thanks for the response. May I ask more questions?



 I'm running a Spark Streaming application which receives data from socket
 and does some transformations.



 The event injection rate is too high so the processing duration is larger
 than batch interval.



 So I see Could not compute split, block input-0-1414049609200 not found
 issue as discussed by others in this post: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237
 



 If the understanding is correct, Spark is lack of storage in this case
 because of event pile-up, so it needs to delete some splits in order to
 free memory.



 However, even in this case, I still see very small number (like 3MB) in
 the Memory Used column where the total memory seems to be quite big (like
 6GB). So I think the number shown in this column may have problems.



 How do Spark calculate the total memory based on allocated JVM heap size?
 I guess it's related with the spark.storage.memoryFraction configuration,
 but want to know the details.

 And why the driver also uses memory to store RDD blocks?



 Thanks again for the answer!


  --

 *From:* Patrick Wendell [mailto:pwend...@gmail.com]
 *Sent:* 2014年10月23日 14:00
 *To:* Haopu Wang
 *Cc:* user
 *Subject:* Re: About Memory usage in the Spark UI



 It shows the amount of memory used to store RDD blocks, which are created
 when you run .cache()/.persist() on an RDD.



 On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote:

 Hi, please take a look at the attached screen-shot. I wonders what's the
 Memory Used column mean.



 I give 2GB memory to the driver process and 12GB memory to the executor
 process.



 Thank you!







Spark using HDFS data [newb]

2014-10-23 Thread matan
Hi, 

I would like to verify or correct my understanding of Spark at the
conceptual level. 
As I understand, ignoring streaming mode for a minute, Spark takes some
input data (which can be an hdfs file), lets your code transform the data,
and ultimately dispatches some computation over the data across its cluster,
thus being a distributed computing platform. It probably/typically splits
the dataset for the computation across the cluster machines, so that each
Spark machine participating in the Spark cluster performs the computation on
its subset of the data. 

Is that the case? 
In case I nailed it, how then does it handle a distributed hdfs file? does
it pull all of the file to/through one Spark server, and partition it from
there across its cluster, or does it partition the hdfs file across its
cluster without such a bottleneck - somehow intelligently letting each Spark
server pull some of the data from HDFS, or, does it all rely on Spark being
installed on each hdfs server and just using the hdfs file chunks of that
server locally, without transporting any input hdfs data at all? 

Many thanks! 
Matan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-using-HDFS-data-newb-tp17169.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem packing spark-assembly jar

2014-10-23 Thread Yana Kadiyska
Hi folks,

I'm trying to deploy the latest from master branch and having some trouble
with the assembly jar.

In the spark-1.1 official distribution(I use cdh version), I see the
following jars, where spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
contains a ton of stuff:
datanucleus-api-jdo-3.2.1.jar
datanucleus-core-3.2.2.jar
datanucleus-rdbms-3.2.1.jar
spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
spark-examples-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
spark-hive-thriftserver_2.10-1.1.0.jar
spark-hive_2.10-1.1.0.jar
spark-sql_2.10-1.1.0.jar


I tried to create a similar distribution off of master running
mvn -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests -Pbigtop-dist
package
and
./make-distribution.sh -Pbigtop-dist -Phive
-Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests


but in either case all I get in spark-assembly is near empty:

spark_official/dist/lib$ jar -tvf
spark-assembly-1.2.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.0.jar

META-INF/
META-INF/MANIFEST.MF
org/
org/apache/
org/apache/spark/
org/apache/spark/unused/
org/apache/spark/unused/UnusedStubClass.class
META-INF/maven/
META-INF/maven/org.spark-project.spark/
META-INF/maven/org.spark-project.spark/unused/
META-INF/maven/org.spark-project.spark/unused/pom.xml
META-INF/maven/org.spark-project.spark/unused/pom.properties
META-INF/NOTICE

Any advice on how to get spark-core and the rest packaged into the assembly
jar -- I'd like to have fewer things to copy around.


RE: About Memory usage in the Spark UI

2014-10-23 Thread Haopu Wang
TD, thanks for the clarification.

 

From the UI, it looks like the driver also allocate memory to store blocks, 
what's the purpose for that because I think driver doesn't need to run tasks?

 



From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: 2014年10月24日 8:07
To: Haopu Wang
Cc: Patrick Wendell; user
Subject: Re: About Memory usage in the Spark UI

 

The memory usage of blocks of data received through Spark Streaming is not 
reflected in the Spark UI. It only shows the memory usage due to cached RDDs.

I didnt find a JIRA for this, so I opened a new one. 

 

https://issues.apache.org/jira/browse/SPARK-4072

 

 

TD

 

On Thu, Oct 23, 2014 at 12:47 AM, Haopu Wang hw...@qilinsoft.com wrote:

Patrick, thanks for the response. May I ask more questions?

 

I'm running a Spark Streaming application which receives data from socket and 
does some transformations.

 

The event injection rate is too high so the processing duration is larger than 
batch interval.

 

So I see Could not compute split, block input-0-1414049609200 not found issue 
as discussed by others in this post: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237;

 

If the understanding is correct, Spark is lack of storage in this case because 
of event pile-up, so it needs to delete some splits in order to free memory.

 

However, even in this case, I still see very small number (like 3MB) in the 
Memory Used column where the total memory seems to be quite big (like 6GB). 
So I think the number shown in this column may have problems.

 

How do Spark calculate the total memory based on allocated JVM heap size? I 
guess it's related with the spark.storage.memoryFraction configuration, but 
want to know the details.

And why the driver also uses memory to store RDD blocks?

 

Thanks again for the answer!

 



From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: 2014年10月23日 14:00
To: Haopu Wang
Cc: user
Subject: Re: About Memory usage in the Spark UI

 

It shows the amount of memory used to store RDD blocks, which are created when 
you run .cache()/.persist() on an RDD.

 

On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote:

Hi, please take a look at the attached screen-shot. I wonders what's the 
Memory Used column mean.

 

I give 2GB memory to the driver process and 12GB memory to the executor process.

 

Thank you!

 

 

 

 



Re: unable to make a custom class as a key in a pairrdd

2014-10-23 Thread Prashant Sharma
Are you doing this in REPL ? Then there is a bug filed for this, I just
can't recall the bug ID at the moment.

Prashant Sharma



On Fri, Oct 24, 2014 at 4:07 AM, Niklas Wilcke 
1wil...@informatik.uni-hamburg.de wrote:

  Hi Jao,

 I don't really know why this doesn't work but I have two hints.
 You don't need to override hashCode and equals. The modifier case is doing
 that for you. Writing

 case class PersonID(id: String)

 would be enough to get the class you want I think.
 If I change the type of the id param to Int it works for me but I don't
 know why.

 case class PersonID(id: Int)

 Looks like a strange behavior to me. Have a try.

 Good luck,
 Niklas


 On 23.10.2014 21:52, Jaonary Rabarisoa wrote:

  Hi all,

  I have the following case class that I want to use as a key in a
 key-value rdd. I defined the equals and hashCode methode but it's not
 working. What I'm doing wrong ?

  *case class PersonID(id: String) {*

 * override def hashCode = id.hashCode*

 * override def equals(other: Any) = other match {*

 * case that: PersonID = this.id http://this.id == that.id
 http://that.id  this.getClass == that.getClass*
 * case _ = false*
 * }   *
 * }   *


 * val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))*


  *p.groupByKey.collect foreach println*

  *(PersonID(1),CompactBuffer(5))*
 *(PersonID(1),CompactBuffer(6))*
 *(PersonID(1),CompactBuffer(7))*
 *(PersonID(1),CompactBuffer(8, 9))*
 *(PersonID(1),CompactBuffer(1))*
 *(PersonID(1),CompactBuffer(2))*
 *(PersonID(1),CompactBuffer(3))*
 *(PersonID(1),CompactBuffer(4))*


  Best,

  Jao





Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-10-23 Thread Svend
Hi all, 

(Waking up an old thread just for future reference) 

We've had a very similar issue just a couple of days ago: executing a spark
driver on the same host as where the mesos master runs succeeds, but
executing it on our remote dev station hangs fails after mesos report the
spark driver disconnects immediately. 

In our case, just setting the LIBPROCESS_IP variable as described below
resolved the issue. 

https://github.com/airbnb/chronos/issues/193





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Local-Dev-Env-with-Mesos-Spark-Streaming-on-Docker-Can-t-submit-jobs-tp5397p17174.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.0.0 on yarn cluster problem

2014-10-23 Thread firemonk9
Hi,

   I am facing same problem. My spark-env.sh has below entries yet I see the
yarn container with only 1G and yarn only spawns two workers. 

SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=3G
SPARK_EXECUTOR_INSTANCES=5

Please let me know if you are able to resolve this issue.

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-on-yarn-cluster-problem-tp7560p17175.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



large benchmark sets for MLlib experiments

2014-10-23 Thread Chih-Jen Lin
Hi MLlib users,

In August when I gave a talk at Databricks, Xiangrui mentioned the
need of large public data for the development of MLlib.
At this moment many use problems in libsvm data sets for experiments.
The file size of larger ones (e.g., kddb) is about 20-30G.

To fullfill the need, we have provided a much larger data set
splice_site at libsvm data sets 
(http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/). 
The training file is around 600G, while the test file is 300G.

The full set of the same problem (3TB) was used in 
A Reliable Effective Terascale Linear Learning System
by Agarwal et al. The original set is from 
COFFIN : A Computational Framework for Linear SVMs by
Sonnenburg and Franc.

Please note that this problem is highly unbalanced, so
accuracy is NOT a suitable criterion. You may use
auPRC (area under precision-recall curve).

We thank the data providers, Olivier Chapelle for providing a script
for data generation, and my students for their help.

We will keep adding more large sets in the future.

Enjoy,
Chih-Jen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Memory requirement of using Spark

2014-10-23 Thread jian.t
Hello, 
I am new to Spark. I have a basic question about the memory requirement of
using Spark. 

I need to join multiple data sources between multiple data sets. The join is
not a straightforward join. The logic is more like: first join T1 on column
A with T2, then for all the records that couldn't find the match in the
Join, join T1 on column B with T2, and then join on C and son on. I was
using HIVE, but it requires multiple scans on T1, which turns out slow.

It seems like if I load T1 and T2 in memory using Spark, I could improve the
performance. However, T1 and T2 totally is around 800G. Does that mean I
need to have 800G memory (I don't have that amount of memory)? Or Spark
could do something like streaming but then again will the performance
sacrifice as a result?



Thanks
JT



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-requirement-of-using-Spark-tp17177.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.0.0 on yarn cluster problem

2014-10-23 Thread Andrew Or
Did you `export` the environment variables? Also, are you running in client
mode or cluster mode? If it still doesn't work you can try to set these
through the spark-submit command lines --num-executors, --executor-cores,
and --executor-memory.

2014-10-23 19:25 GMT-07:00 firemonk9 dhiraj.peech...@gmail.com:

 Hi,

I am facing same problem. My spark-env.sh has below entries yet I see
 the
 yarn container with only 1G and yarn only spawns two workers.

 SPARK_EXECUTOR_CORES=1
 SPARK_EXECUTOR_MEMORY=3G
 SPARK_EXECUTOR_INSTANCES=5

 Please let me know if you are able to resolve this issue.

 Thank you



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-on-yarn-cluster-problem-tp7560p17175.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Multitenancy in Spark - within/across spark context

2014-10-23 Thread Evan Chan
Ashwin,

I would say the strategies in general are:

1) Have each user submit separate Spark app (each its own Spark
Context), with its own resource settings, and share data through HDFS
or something like Tachyon for speed.

2) Share a single spark context amongst multiple users, using fair
scheduler.  This is sort of like having a Hadoop resource pool.It
has some obvious HA/SPOF issues, namely that if the context dies then
every user using it is also dead.   Also, sharing RDDs in cached
memory has the same resiliency problems, namely that if any executor
dies then Spark must recompute / rebuild the RDD (it tries to only
rebuild the missing part, but sometimes it must rebuild everything).

Job server can help with 1 or 2, 2 in particular.  If you have any
questions about job server, feel free to ask at the spark-jobserver
google group.   I am the maintainer.

-Evan


On Thu, Oct 23, 2014 at 1:06 PM, Marcelo Vanzin van...@cloudera.com wrote:
 You may want to take a look at 
 https://issues.apache.org/jira/browse/SPARK-3174.

 On Thu, Oct 23, 2014 at 2:56 AM, Jianshi Huang jianshi.hu...@gmail.com 
 wrote:
 Upvote for the multitanency requirement.

 I'm also building a data analytic platform and there'll be multiple users
 running queries and computations simultaneously. One of the paint point is
 control of resource size. Users don't really know how much nodes they need,
 they always use as much as possible... The result is lots of wasted resource
 in our Yarn cluster.

 A way to 1) allow multiple spark context to share the same resource or 2)
 add dynamic resource management for Yarn mode is very much wanted.

 Jianshi

 On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar
 ashwinshanka...@gmail.com wrote:
  That's not something you might want to do usually. In general, a
  SparkContext maps to a user application
 
  My question was basically this. In this page in the official doc, under
  Scheduling within an application section, it talks about multiuser and
  fair sharing within an app. How does multiuser within an application
  work(how users connect to an app,run their stuff) ? When would I want to
  use
  this ?

 I see. The way I read that page is that Spark supports all those
 scheduling options; but Spark doesn't give you the means to actually
 be able to submit jobs from different users to a running SparkContext
 hosted on a different process. For that, you'll need something like
 the job server that I referenced before, or write your own framework
 for supporting that.

 Personally, I'd use the information on that page when dealing with
 concurrent jobs in the same SparkContext, but still restricted to the
 same user. I'd avoid trying to create any application where a single
 SparkContext is trying to be shared by multiple users in any way.

  As far as I understand, this will cause executors to be killed, which
  means that Spark will start retrying tasks to rebuild the data that
  was held by those executors when needed.
 
  I basically wanted to find out if there were any gotchas related to
  preemption on Spark. Things like say half of an application's executors
  got
  preempted say while doing reduceByKey, will the application progress
  with
  the remaining resources/fair share ?

 Jobs should still make progress as long as at least one executor is
 available. The gotcha would be the one I mentioned, where Spark will
 fail your job after x executors failed, which might be a common
 occurrence when preemption is enabled. That being said, it's a
 configurable option, so you can set x to a very large value and your
 job should keep on chugging along.

 The options you'd want to take a look at are: spark.task.maxFailures
 and spark.yarn.max.executor.failures

 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



 --
 Marcelo

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org