Re: SparkSQL: Nested Query error

2014-10-30 Thread SK
Hi,

 I am getting an error in the Query Plan when I use the SQL statement
exactly as you have suggested. Is that the exact SQL statement I should be
using (I am not very familiar with SQL syntax)?


I also tried using the SchemaRDD's subtract method to perform this query.
usersRDD.subtract(deviceRDD).count(). The count comes out to be  1, but
there are many UIDs in tusers that are not in device - so the result is
not correct. 

I would like to know the right way to do frame this query in SparkSQL.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691p17705.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Debugging

2014-10-30 Thread Naveen Kumar Pokala
Hi,

  I have installed 2 node hadoop cluster (For example, on Unix machines A and 
B. A master node and data node, B is data node)

  I am submitting my driver programs through SPARK 1.1.0 with bin/spark-submit 
from Putty Client from my Windows machine.

I want to debug my program from Eclipse on my local machine. I am not able to 
find a way to debug.

Please let me know the ways to debug my driver program as well as executor 
programs


Regards,

Naveen.


Re: Spark Debugging

2014-10-30 Thread Akhil Das
Its called remote debugging. You can read this article
http://www.eclipse.org/jetty/documentation/current/enable-remote-debugging.html
for more information. You will have to make sure that the network between
your cluster and windows machine can communicate with each other.

Thanks
Best Regards

On Thu, Oct 30, 2014 at 12:06 PM, Naveen Kumar Pokala 
npok...@spcapitaliq.com wrote:

 Hi,



   I have installed 2 node hadoop cluster (For example, on Unix machines A
 and B. A master node and data node, B is data node)



   I am submitting my driver programs through SPARK 1.1.0 with
 bin/spark-submit from Putty Client from my Windows machine.



 I want to debug my program from Eclipse on my local machine. I am not able
 to find a way to debug.



 Please let me know the ways to debug my driver program as well as executor
 programs





 Regards,



 Naveen.



Re: GC Issues with randomSplit on large dataset

2014-10-30 Thread Sean Owen
Can you be more specific about numbers?
I am not sure that splitting helps so much in the end, in that it has
the same effect as executing a smaller number at a time of the large
number of tasks that the full cartesian join would generate.
The full join is probably intractable no matter what in this case?
The OOM is not necessarily directly related. It depends on where it
happened, what else you are doing, how much memory you gave, etc.

On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya
ilya.gane...@capitalone.com wrote:
 Hey all – not writing to necessarily get a fix but more to get an
 understanding of what’s going on internally here.

 I wish to take a cross-product of two very large RDDs (using cartesian), the
 product of which is well in excess of what can be stored on disk . Clearly
 that is intractable, thus my solution is to do things in batches -
 essentially I can take the cross product of a small piece of the first data
 set with the entirety of the other. To do this, I calculate how many items
 can fit into 1 gig of memory. Next, I use RDD.random Split() to partition
 the first data set. The issue is that I am trying to partition an RDD of
 several million items into several million partitions. This throws the
 following error:

 I would like to understand the internals of what’s going on here so that I
 can adjust my approach accordingly. Thanks in advance.


 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread
 [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem
 [sparkDriver]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
 at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
 at
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
 at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
 at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
 at
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
 exceeded
 at java.util.Arrays.copyOfRange(Arrays.java:2694)
 at java.lang.String.init(String.java:203)
 at java.lang.String.substring(String.java:1913)
 at java.lang.String.subSequence(String.java:1946)
 at java.util.regex.Matcher.getSubSequence(Matcher.java:1245)
 at java.util.regex.Matcher.group(Matcher.java:490)
 at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675)
 at java.util.Formatter.parse(Formatter.java:2528)
 at java.util.Formatter.format(Formatter.java:2469)
 at java.util.Formatter.format(Formatter.java:2423)
 at java.lang.String.format(String.java:2790)
 at scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
 at scala.collection.immutable.StringOps.format(StringOps.scala:31)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1227)
 at org.apache.spark.rdd.RDD.init(RDD.scala:83)
 at
 org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47)
 at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378)
 at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD.randomSplit(RDD.scala:379)



 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates. The information 

Re: Algebird using spark-shell

2014-10-30 Thread thadude
I've tried:

. /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar

scala import com.twitter.algebird._
import com.twitter.algebird._

scala  import HyperLogLog._
import HyperLogLog._

scala import com.twitter.algebird.HyperLogLogMonoid
import com.twitter.algebird.HyperLogLogMonoid


scala val hll = new HyperLogLogMonoid(12)
hll: com.twitter.algebird.HyperLogLogMonoid =
com.twitter.algebird.HyperLogLogMonoid@7bde289a


https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



NonSerializable Exception in foreachRDD

2014-10-30 Thread Harold Nguyen
Hi all,

In Spark Streaming, when I do foreachRDD on my DStreams, I get a
NonSerializable exception when I try to do something like:

DStream.foreachRDD( rdd = {
  var sc.parallelize(Seq((test, blah)))
})

Is there any way around that ?

Thanks,

Harold


Re: BUG: when running as extends App, closures don't capture variables

2014-10-30 Thread Sean Owen
Very coincidentally I ran into something equally puzzling yesterday where
something was bizarrely null when it can't have been in a Spark program
that extends App. I also changed to use main() and it works fine. So
definitely some issue here. If nobody makes a JIRA before I get home I'll
do it.
On Oct 29, 2014 11:20 PM, Michael Albert m_albert...@yahoo.com.invalid
wrote:

 Greetings!

 This might be a documentation issue as opposed to a coding issue, in that
 perhaps the correct answer is don't do that, but as this is not obvious,
 I am writing.

 The following code produces output most would not expect:

 package misc

 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._

 object DemoBug extends App {
 val conf = new SparkConf()
 val sc = new SparkContext(conf)

 val rdd = sc.parallelize(List(A,B,C,D))
 val str1 = A

 val rslt1 = rdd.filter(x = { x != A }).count
 val rslt2 = rdd.filter(x = { str1 != null  x != A }).count

 println(DemoBug: rslt1 =  + rslt1 +  rslt2 =  + rslt2)
 }

 This produces the output:
 DemoBug: rslt1 = 3 rslt2 = 0

 Compiled with sbt:
 libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0
 Run on an EC2 EMR instance with a recent image (hadoop 2.4.0, spark 1.1.0)

 If instead there is a proper main(), it works as expected.

 Thank you.

 Sincerely,
  Mike



sharing RDDs between PySpark and Scala

2014-10-30 Thread rok
I'm processing some data using PySpark and I'd like to save the RDDs to disk
(they are (k,v) RDDs of strings and SparseVector types) and read them in
using Scala to run them through some other analysis. Is this possible? 

Thanks,

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sharing-RDDs-between-PySpark-and-Scala-tp17718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Debugging

2014-10-30 Thread Akhil Das
Thanks
Best Regards

On Thu, Oct 30, 2014 at 1:43 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Awesome.

 Thanks
 Best Regards

 On Thu, Oct 30, 2014 at 1:30 PM, Naveen Kumar Pokala 
 npok...@spcapitaliq.com wrote:

 Thanks Akhil, It is working.



 Regards,

 Naveen



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Thursday, October 30, 2014 1:24 PM

 *To:* Naveen Kumar Pokala
 *Subject:* Re: Spark Debugging



 Yes you can, but the values might not give you a complete clue. Give it a
 try


 Thanks

 Best Regards



 On Thu, Oct 30, 2014 at 1:20 PM, Naveen Kumar Pokala 
 npok...@spcapitaliq.com wrote:

 Do you mean can’t I debug line by line?



 Thanks and Regards

 Naveen



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Thursday, October 30, 2014 1:19 PM


 *To:* Naveen Kumar Pokala
 *Subject:* Re: Spark Debugging



 Master machine. The execution will get pause in eclipse regardless of
 where its executing the program, you might not get a complete flow, but you
 can see whats happening.


 Thanks

 Best Regards



 On Thu, Oct 30, 2014 at 1:10 PM, Naveen Kumar Pokala 
 npok...@spcapitaliq.com wrote:

 Hi Akhil,



 With below configuration I am successfully able to connect to  Node
 A(Master node) where I am running the spark submit program, Spark may give
 driver program to another Node B also.



 Which machine IP should I use to debug the programs from eclipse.



 Thanks,

 Naveen



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Thursday, October 30, 2014 12:35 PM
 *To:* Naveen Kumar Pokala
 *Subject:* Re: Spark Debugging



 ​Hi Naveen,



 You should be able to connect to  port also from your windows machine.
 ​


 Thanks

 Best Regards



 On Thu, Oct 30, 2014 at 12:32 PM, Naveen Kumar Pokala 
 npok...@spcapitaliq.com wrote:

 Hi Akhil,



 I have gone through the article. But I am  not starting my java program
 with Java command, I am submitting through spark-submit and I have added
 like below is that ok?



 bin/spark-submit --class sample.spark.test.SparkJob --master yarn-cluster
 --driver-memory 4g --executor-memory  --conf
 spark.executor.extraJavaOptions=-Xdebug
 -agentlib:jdwp=transport=dt_socket,address=,server=y,suspend=n
 /home/npokala/data/test.jar



 I am able to connect to cluster from my windows machine only through
 putty. I am able to access the cluster information and spark information
 from my browser. Is that sufficient.?





 Regards,

 Naveen.



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Thursday, October 30, 2014 12:10 PM
 *To:* Naveen Kumar Pokala
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Debugging



 Its called remote debugging. You can read this article
 http://www.eclipse.org/jetty/documentation/current/enable-remote-debugging.html
 for more information. You will have to make sure that the network between
 your cluster and windows machine can communicate with each other.


 Thanks

 Best Regards



 On Thu, Oct 30, 2014 at 12:06 PM, Naveen Kumar Pokala 
 npok...@spcapitaliq.com wrote:

 Hi,



   I have installed 2 node hadoop cluster (For example, on Unix machines A
 and B. A master node and data node, B is data node)



   I am submitting my driver programs through SPARK 1.1.0 with
 bin/spark-submit from Putty Client from my Windows machine.



 I want to debug my program from Eclipse on my local machine. I am not
 able to find a way to debug.



 Please let me know the ways to debug my driver program as well as
 executor programs





 Regards,



 Naveen.













Spark + Tableau

2014-10-30 Thread Bojan Kostic
I'm testing beta driver from Databricks for Tableua.
And unfortunately i encounter some issues.
While beeline connection works without problems, Tableau can't connect to
spark thrift server.

Error from driver(Tableau):
Unable to connect to the ODBC Data Source. Check that the necessary drivers
are installed and that the connection properties are valid.
[Simba][SparkODBC] (34) Error from Spark: ETIMEDOUT.

Unable to connect to the server test.server.com. Check that the server is
running and that you have access privileges to the requested database.
Unable to connect to the server. Check that the server is running and that
you have access privileges to the requested database.

Exception on Thrift server:
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182)
at
org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more

Is there anyone else who's testing this driver, or did anyone saw this
message?

Best regards
Bojan Kostić



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting vector values

2014-10-30 Thread Andrejs Abele
Hi,

I'm new to Mllib and spark.  I'm trying to use tf-idf and use those values
for term ranking.
I'm getting tf values in vector format, but how can get the values of
vector?

 val sc = new SparkContext(conf)
 val documents: RDD[Seq[String]] =
sc.textFile(/home/andrejs/Datasets/dbpedia/test.txt).map(_.split(
).toSeq)
   documents.foreach(println(_))
   val hashingTF = new HashingTF()
   val tf: RDD[Vector] = hashingTF.transform(documents)

   tf.foreach(println(_))

My output is :
WrappedArray(a, a, b, c)
WrappedArray(e, a, c, d)

(1048576,[97,99,100,101],[1.0,1.0,1.0,1.0])
(1048576,[97,98,99],[2.0,1.0,1.0])

How can I get  [97,99,100,101] out, and [1.0,1.0,1.0,1.0] ?
And how can I map that 100 = 1.0  ?

Some help is greatly appreciated,

Andrejs


Re: GC Issues with randomSplit on large dataset

2014-10-30 Thread Ilya Ganelin
The split is something like 30 million into 2 milion partitions. The reason
that it becomes tractable is that after I perform the Cartesian on the
split data and operate on it I don't keep the full results - I actually
only keep a tiny fraction of that generated dataset - making the overall
dataset tractable ( I neglected to mention this in the first email).

The way the code is structured I have forced linear execution until this
point so at the time of execution of the split it is the only thing
happening. In terms of memory I have assigned 23gb of memory and 17gb of
heap.
On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote:

 Can you be more specific about numbers?
 I am not sure that splitting helps so much in the end, in that it has
 the same effect as executing a smaller number at a time of the large
 number of tasks that the full cartesian join would generate.
 The full join is probably intractable no matter what in this case?
 The OOM is not necessarily directly related. It depends on where it
 happened, what else you are doing, how much memory you gave, etc.

 On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hey all – not writing to necessarily get a fix but more to get an
  understanding of what’s going on internally here.
 
  I wish to take a cross-product of two very large RDDs (using cartesian),
 the
  product of which is well in excess of what can be stored on disk .
 Clearly
  that is intractable, thus my solution is to do things in batches -
  essentially I can take the cross product of a small piece of the first
 data
  set with the entirety of the other. To do this, I calculate how many
 items
  can fit into 1 gig of memory. Next, I use RDD.random Split() to partition
  the first data set. The issue is that I am trying to partition an RDD of
  several million items into several million partitions. This throws the
  following error:
 
  I would like to understand the internals of what’s going on here so that
 I
  can adjust my approach accordingly. Thanks in advance.
 
 
  14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread
  [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem
  [sparkDriver]
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
  at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
  at
 
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
  at
 
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
  exceeded
  at java.util.Arrays.copyOfRange(Arrays.java:2694)
  at java.lang.String.init(String.java:203)
  at java.lang.String.substring(String.java:1913)
  at java.lang.String.subSequence(String.java:1946)
  at java.util.regex.Matcher.getSubSequence(Matcher.java:1245)
  at java.util.regex.Matcher.group(Matcher.java:490)
  at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675)
  at java.util.Formatter.parse(Formatter.java:2528)
  at java.util.Formatter.format(Formatter.java:2469)
  at java.util.Formatter.format(Formatter.java:2423)
  at java.lang.String.format(String.java:2790)
  at
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
  at scala.collection.immutable.StringOps.format(StringOps.scala:31)
  at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944)
  at org.apache.spark.rdd.RDD.init(RDD.scala:1227)
  at org.apache.spark.rdd.RDD.init(RDD.scala:83)
  at
 
 org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47)
  at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378)
  at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at

Returned type of Broadcast variable is byte array

2014-10-30 Thread Stephen Boesch
As a template for creating a broadcast variable, the following code snippet
within mllib was used:

val bcIdf = dataset.context.broadcast(idf)
dataset.mapPartitions { iter =
  val thisIdf = bcIdf.value


The new code follows that model:

import org.apache.spark.mllib.linalg.{Vector = MVector}
  ..
assert(crows.isInstanceOf[Array[MVector]])
val bcRows = sc.broadcast(crows)
val GU = mat.rows.zipWithIndex.mapPartitions { case dataIter =
val arrayVect = bcRows.value // bcRows.value is seen in
debugger to be of type Array[Byte] .. ??

That last line is unhappy:

   java.lang.ClassCastException: [B cannot be cast to
[Lorg.apache.spark.mllib.linalg.Vector;

So the compiler is aware that the return type of the broadcast value
method should be an array of vector (which it should). However the actual
type is Array[Byte].   Any insights on this?


Re: Spark + Tableau

2014-10-30 Thread Jimmy
What ODBC driver are you using? We recently got the Hortonworks JODBC drivers 
working on a Windows box but was having issues with Mac



Sent from my iPhone

 On Oct 30, 2014, at 4:23 AM, Bojan Kostic blood9ra...@gmail.com wrote:
 
 I'm testing beta driver from Databricks for Tableua.
 And unfortunately i encounter some issues.
 While beeline connection works without problems, Tableau can't connect to
 spark thrift server.
 
 Error from driver(Tableau):
 Unable to connect to the ODBC Data Source. Check that the necessary drivers
 are installed and that the connection properties are valid.
 [Simba][SparkODBC] (34) Error from Spark: ETIMEDOUT.
 
 Unable to connect to the server test.server.com. Check that the server is
 running and that you have access privileges to the requested database.
 Unable to connect to the server. Check that the server is running and that
 you have access privileges to the requested database.
 
 Exception on Thrift server:
 java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at
 org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
 org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
 Caused by: org.apache.thrift.transport.TTransportException
at
 org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at
 org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
 org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182)
at
 org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at
 org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
 org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
 org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more
 
 Is there anyone else who's testing this driver, or did anyone saw this
 message?
 
 Best regards
 Bojan Kostić
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Tableau

2014-10-30 Thread Bojan Kostic
I use beta driver SQL ODBC from Databricks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using a Database to persist and load data from

2014-10-30 Thread Asaf Lahav
Hi Ladies and Gents,
I would like to know what are the options I have if I would like to
leverage Spark code I already have written to use a DB (Vertica) as its
store/datasource.
The data is of tabular nature. So any relational DB can essentially be used.

Do I need to develop a context? If yes, how? where can I get a good example?


Thank you,
Asaf


issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi,



Previous we have applied SVM algorithm in MLlib to 5 million records (600
mb), it takes more than 25 minutes to finish.
The spark version we are using is 1.0 and we were running this program on a
4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.

The 5 million records only have two distinct records (One positive and one
negative), others are all duplications.

Any one has any idea on why it takes so long on this small data?



Thanks,
Best,

Peng


Re: Algebird using spark-shell

2014-10-30 Thread Ian O'Connell
Whats the error with the 2.10 version of algebird?

On Thu, Oct 30, 2014 at 12:49 AM, thadude ohpre...@yahoo.com wrote:

 I've tried:

 . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar

 scala import com.twitter.algebird._
 import com.twitter.algebird._

 scala  import HyperLogLog._
 import HyperLogLog._

 scala import com.twitter.algebird.HyperLogLogMonoid
 import com.twitter.algebird.HyperLogLogMonoid


 scala val hll = new HyperLogLogMonoid(12)
 hll: com.twitter.algebird.HyperLogLogMonoid =
 com.twitter.algebird.HyperLogLogMonoid@7bde289a


 https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: problem with start-slaves.sh

2014-10-30 Thread Yana Kadiyska
Roberto, I don't think shark is an issue -- I have shark server running on
a node that also acts as a worker. What you can do is turn off shark
server, just run start-all to start your spark cluster. then you can try
bin/spark-shell --master yourmasterip and see if you can successfully run
some hello world stuff. This will verify you have a working Spark
cluster. Shark is just an application on top of it, so I can't imagine
that's what's causing interference. But stopping it is the simplest way to
check.

On Wed, Oct 29, 2014 at 10:54 PM, Pagliari, Roberto rpagli...@appcomsci.com
 wrote:

  hi Yana,
 in my case I did not start any spark worker. However, shark was definitely
 running. Do you think that might be a problem?

  I will take a look

  Thank you,

  --
 *From:* Yana Kadiyska [yana.kadiy...@gmail.com]
 *Sent:* Wednesday, October 29, 2014 9:45 AM
 *To:* Pagliari, Roberto
 *Cc:* user@spark.apache.org
 *Subject:* Re: problem with start-slaves.sh

   I see this when I start a worker and then try to start it again
 forgetting it's already running (I don't use start-slaves, I start the
 slaves individually with start-slave.sh). All this is telling you is that
 there is already a running process on that machine. You can see it if you
 do a ps -aef|grep worker

  you can look on the spark UI and see if your master shows this machine
 as connected to it already. If it doesn't, you might want to kill the
 worker process and restart it.

 On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto 
 rpagli...@appcomsci.com wrote:

  I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build
 with PHive option to be able to interface with hive)



 I’m getting this



 ip_address: org.apache.spark.deploy.worker.Worker running as process
 . Stop it first.



 Am I doing something wrong? In my specific case, shark+hive is running on
 the nodes. Does that interfere with spark?



 Thank you,





RE: problem with start-slaves.sh

2014-10-30 Thread Pagliari, Roberto
I also didn’t realize I was trying to bring up the 2ndNameNode as a slave.. 
that might be an issue as well..

Thanks,


From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Thursday, October 30, 2014 11:27 AM
To: Pagliari, Roberto
Cc: user@spark.apache.org
Subject: Re: problem with start-slaves.sh

Roberto, I don't think shark is an issue -- I have shark server running on a 
node that also acts as a worker. What you can do is turn off shark server, just 
run start-all to start your spark cluster. then you can try bin/spark-shell 
--master yourmasterip and see if you can successfully run some hello world 
stuff. This will verify you have a working Spark cluster. Shark is just an 
application on top of it, so I can't imagine that's what's causing 
interference. But stopping it is the simplest way to check.

On Wed, Oct 29, 2014 at 10:54 PM, Pagliari, Roberto 
rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote:
hi Yana,
in my case I did not start any spark worker. However, shark was definitely 
running. Do you think that might be a problem?

I will take a look

Thank you,


From: Yana Kadiyska [yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com]
Sent: Wednesday, October 29, 2014 9:45 AM
To: Pagliari, Roberto
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: problem with start-slaves.sh
I see this when I start a worker and then try to start it again forgetting it's 
already running (I don't use start-slaves, I start the slaves individually with 
start-slave.sh). All this is telling you is that there is already a running 
process on that machine. You can see it if you do a ps -aef|grep worker

you can look on the spark UI and see if your master shows this machine as 
connected to it already. If it doesn't, you might want to kill the worker 
process and restart it.

On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto 
rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote:
I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive 
option to be able to interface with hive)

I’m getting this

ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop 
it first.

Am I doing something wrong? In my specific case, shark+hive is running on the 
nodes. Does that interfere with spark?

Thank you,




Re: Spark + Tableau

2014-10-30 Thread Denny Lee
When you are starting the thrift server service - are you connecting to it
locally or is this on a remote server when you use beeline and/or Tableau?

On Thu, Oct 30, 2014 at 8:00 AM, Bojan Kostic blood9ra...@gmail.com wrote:

 I use beta driver SQL ODBC from Databricks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17727.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread Jimmy
Watch the app manager it should tell you what's running and taking awhile... My 
guess it's a distinct function on the data.
J

Sent from my iPhone

 On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
 
 Hi,
 
  
 
 Previous we have applied SVM algorithm in MLlib to 5 million records (600 
 mb), it takes more than 25 minutes to finish.
 The spark version we are using is 1.0 and we were running this program on a 4 
 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
 
 The 5 million records only have two distinct records (One positive and one 
 negative), others are all duplications.
 
 Any one has any idea on why it takes so long on this small data?
 
  
 
 Thanks,
 Best,
 
 Peng


Best way to partition RDD

2014-10-30 Thread shahab
Hi.

I am running an application in the Spark which first loads data from
Cassandra and then performs some map/reduce jobs.

val srdd = sqlContext.sql(select * from mydb.mytable   )
I noticed that the srdd only has one partition . no matter how big is the
data loaded form Cassandra.

So I perform repartition on the RDD , and then I did the map/reduce
functions.

But the main problem is that repartition takes so much time (almost 2
min), which is not acceptable in my use-case. Is there any better way to do
repartitioning?

best,
/Shahab


Re: Spark + Tableau

2014-10-30 Thread Bojan Kostic
I'm  connecting to it remotly with tableau/beeline.

On Thu Oct 30 16:51:13 2014 GMT+0100, Denny Lee [via Apache Spark User List] 
wrote:
 
 
 When you are starting the thrift server service - are you connecting to it
 locally or is this on a remote server when you use beeline and/or Tableau?
 
 On Thu, Oct 30, 2014 at 8:00 AM, Bojan Kostic blood9ra...@gmail.com wrote:
 
  I use beta driver SQL ODBC from Databricks.
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17727.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 
 
 ___
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17734.html
 To start a new topic under Apache Spark User List, email 
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Spark + Tableau, visit 
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=17720code=Ymxvb2Q5cmF2ZW5AZ21haWwuY29tfDE3NzIwfDU5NzgxNDc0Ng=

-- 
Sent from my Jolla



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: GC Issues with randomSplit on large dataset

2014-10-30 Thread Vladimir Rodionov
GC limit overhead exceeded is usually sign of either inadequate heap size
(not the case here) or application produces garbage (temp objects) faster
than garbage collector collects them - GC consumes most CPU cycles. 17G of
Java heap is quite large for many application and is above safe and
recommended limit (6-8GB) for Java server application.

From what I saw in the stack trace I can conclude that some operations in
RDD implementation are heap polluters. I am not the expert in Spark but it
seems that Spark is not well optimized yet to work with reasonably large
Java heaps.

One of the options here is try to reduce JVM heap size and reduce data size
per JVM instance.

-Vladimir Rodionov



On Thu, Oct 30, 2014 at 5:22 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 The split is something like 30 million into 2 milion partitions. The
 reason that it becomes tractable is that after I perform the Cartesian on
 the split data and operate on it I don't keep the full results - I actually
 only keep a tiny fraction of that generated dataset - making the overall
 dataset tractable ( I neglected to mention this in the first email).

 The way the code is structured I have forced linear execution until this
 point so at the time of execution of the split it is the only thing
 happening. In terms of memory I have assigned 23gb of memory and 17gb of
 heap.
 On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote:

 Can you be more specific about numbers?
 I am not sure that splitting helps so much in the end, in that it has
 the same effect as executing a smaller number at a time of the large
 number of tasks that the full cartesian join would generate.
 The full join is probably intractable no matter what in this case?
 The OOM is not necessarily directly related. It depends on where it
 happened, what else you are doing, how much memory you gave, etc.

 On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hey all – not writing to necessarily get a fix but more to get an
  understanding of what’s going on internally here.
 
  I wish to take a cross-product of two very large RDDs (using
 cartesian), the
  product of which is well in excess of what can be stored on disk .
 Clearly
  that is intractable, thus my solution is to do things in batches -
  essentially I can take the cross product of a small piece of the first
 data
  set with the entirety of the other. To do this, I calculate how many
 items
  can fit into 1 gig of memory. Next, I use RDD.random Split() to
 partition
  the first data set. The issue is that I am trying to partition an RDD of
  several million items into several million partitions. This throws the
  following error:
 
  I would like to understand the internals of what’s going on here so
 that I
  can adjust my approach accordingly. Thanks in advance.
 
 
  14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from
 thread
  [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem
  [sparkDriver]
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
  at
 akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
  at
 
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
  at
 
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
  exceeded
  at java.util.Arrays.copyOfRange(Arrays.java:2694)
  at java.lang.String.init(String.java:203)
  at java.lang.String.substring(String.java:1913)
  at java.lang.String.subSequence(String.java:1946)
  at java.util.regex.Matcher.getSubSequence(Matcher.java:1245)
  at java.util.regex.Matcher.group(Matcher.java:490)
  at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675)
  at java.util.Formatter.parse(Formatter.java:2528)
  at java.util.Formatter.format(Formatter.java:2469)
  at java.util.Formatter.format(Formatter.java:2423)
  at java.lang.String.format(String.java:2790)
  at
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
  at 

Re: Manipulating RDDs within a DStream

2014-10-30 Thread Harold Nguyen
Hi,

Sorry, there's a typo there:

val arr = rdd.toArray


Harold

On Thu, Oct 30, 2014 at 9:58 AM, Harold Nguyen har...@nexgate.com wrote:

 Hi all,

 I'd like to be able to modify values in a DStream, and then send it off to
 an external source like Cassandra, but I keep getting Serialization errors
 and am not sure how to use the correct design pattern. I was wondering if
 you could help me.

 I'd like to be able to do the following:

  wordCounts.foreachRDD( rdd = {

val arr = record.toArray
...

 })

 I would like to use the arr to send back to cassandra, for instance:

 Use it like this:

 val collection = sc.parallelize(Seq(a.head._1, a.head_.2))
 collection.saveToCassandra()

 Or something like that, but as you know, I can't do this within the
 foreacRDD but only at the driver level. How do I use the arr variable
 to do something like that ?

 Thanks for any help,

 Harold




Manipulating RDDs within a DStream

2014-10-30 Thread Harold Nguyen
Hi all,

I'd like to be able to modify values in a DStream, and then send it off to
an external source like Cassandra, but I keep getting Serialization errors
and am not sure how to use the correct design pattern. I was wondering if
you could help me.

I'd like to be able to do the following:

 wordCounts.foreachRDD( rdd = {

   val arr = record.toArray
   ...

})

I would like to use the arr to send back to cassandra, for instance:

Use it like this:

val collection = sc.parallelize(Seq(a.head._1, a.head_.2))
collection.saveToCassandra()

Or something like that, but as you know, I can't do this within the
foreacRDD but only at the driver level. How do I use the arr variable
to do something like that ?

Thanks for any help,

Harold


Re: Algebird using spark-shell

2014-10-30 Thread Buntu Dev
Thanks.. I was using Scala 2.11.1 and was able to
use algebird-core_2.10-0.1.11.jar with spark-shell.

On Thu, Oct 30, 2014 at 8:22 AM, Ian O'Connell i...@ianoconnell.com wrote:

 Whats the error with the 2.10 version of algebird?

 On Thu, Oct 30, 2014 at 12:49 AM, thadude ohpre...@yahoo.com wrote:

 I've tried:

 . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar

 scala import com.twitter.algebird._
 import com.twitter.algebird._

 scala  import HyperLogLog._
 import HyperLogLog._

 scala import com.twitter.algebird.HyperLogLogMonoid
 import com.twitter.algebird.HyperLogLogMonoid


 scala val hll = new HyperLogLogMonoid(12)
 hll: com.twitter.algebird.HyperLogLogMonoid =
 com.twitter.algebird.HyperLogLogMonoid@7bde289a


 https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Best way to partition RDD

2014-10-30 Thread shahab
Hi Helena,

Well... I am just running a toy example, I have one Cassandra node
co-located with the Spark Master and one of Spark Workers, all in one
machine. I have another node which runs the second Spark worker.

/Shahab,


On Thu, Oct 30, 2014 at 6:12 PM, Helena Edelson helena.edel...@datastax.com
 wrote:

 Hi Shahab,
 -How many spark/cassandra nodes are in your cluster?
 -What is your deploy topology for spark and cassandra clusters? Are they
 co-located?

 - Helena
 @helenaedelson

 On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote:

 Hi.

 I am running an application in the Spark which first loads data from
 Cassandra and then performs some map/reduce jobs.

 val srdd = sqlContext.sql(select * from mydb.mytable   )
 I noticed that the srdd only has one partition . no matter how big is
 the data loaded form Cassandra.

 So I perform repartition on the RDD , and then I did the map/reduce
 functions.

 But the main problem is that repartition takes so much time (almost 2
 min), which is not acceptable in my use-case. Is there any better way to do
 repartitioning?

 best,
 /Shahab





Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread Xiangrui Meng
DId you cache the data and check the load balancing? How many
features? Which API are you using, Scala, Java, or Python? -Xiangrui

On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
 Watch the app manager it should tell you what's running and taking awhile...
 My guess it's a distinct function on the data.
 J

 Sent from my iPhone

 On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:

 Hi,



 Previous we have applied SVM algorithm in MLlib to 5 million records (600
 mb), it takes more than 25 minutes to finish.
 The spark version we are using is 1.0 and we were running this program on a
 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.

 The 5 million records only have two distinct records (One positive and one
 negative), others are all duplications.

 Any one has any idea on why it takes so long on this small data?



 Thanks,
 Best,

 Peng

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Doing RDD.count in parallel , at at least parallelize it as much as possible?

2014-10-30 Thread shahab
Hi,

I noticed that the count (of RDD)  in many of my queries is the most time
consuming one as it runs in the driver process rather then done by
parallel worker nodes,

Is there any way to perform count in parallel , at at least parallelize
 it as much as possible?

best,
/Shahab


Re: Best way to partition RDD

2014-10-30 Thread Helena Edelson
Shahab,

Regardless, WRT cassandra and spark when using the spark cassandra connector,  
‘spark.cassandra.input.split.size’ passed into the SparkConf configures the 
approx number of Cassandra partitions in a Spark partition (default 10).
No repartitioning should be necessary with what you have below, but I don’t 
know if you are running on one node or a cluster.

This is a good initial guide: 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#configuration-options-for-adjusting-reads
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L26-L37

Cheers,
Helena
@helenaedelson

On Oct 30, 2014, at 1:12 PM, Helena Edelson helena.edel...@datastax.com wrote:

 Hi Shahab, 
 -How many spark/cassandra nodes are in your cluster?
 -What is your deploy topology for spark and cassandra clusters? Are they 
 co-located?
 
 - Helena
 @helenaedelson
 
 On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote:
 
 Hi.
 
 I am running an application in the Spark which first loads data from 
 Cassandra and then performs some map/reduce jobs.
 val srdd = sqlContext.sql(select * from mydb.mytable   )
 
 I noticed that the srdd only has one partition . no matter how big is the 
 data loaded form Cassandra.
 
 So I perform repartition on the RDD , and then I did the map/reduce 
 functions.
 
 But the main problem is that repartition takes so much time (almost 2 
 min), which is not acceptable in my use-case. Is there any better way to do 
 repartitioning?
 
 best,
 /Shahab
 



Re: MLLib: libsvm - default value initialization

2014-10-30 Thread Xiangrui Meng
You can remove 0.5 from all non-zeros. -Xiangrui

On Wed, Oct 29, 2014 at 9:20 PM, Sameer Tilak ssti...@live.com wrote:
 Hi All,
 I have my sparse data in libsvm format.

 val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc,
 mllib/data/sample_libsvm_data.txt)

 I am running Linear regression. Let us say that my data has following entry:
 1 1:0  4:1

 I think it will assume 0 for indices 2 and 3, right? I would like to make
 default values to be 0.5  instead of 0. Is it possible? If not, I will have
 to switch to dense data and it will significantly increase the data size for
 me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



k-mean - result interpretation

2014-10-30 Thread mgCl2
Hello everyone,

I'm trying to use MLlib's K-mean algorithm.

I tried it on raw data, Here is a example of a line contained in my input
data set:
82.9817 3281.4495

with those parameters:
*numClusters*=4
*numIterations*=20

results:
*WSSSE = 6.375371241589461E9*

Then I normalized my data:
0.02219046937793337492 0.97780953062206662508
With the same parameters, result is now:
 *WSSSE= 0.04229916511906393*

Is it normal that normalization improve my results?
Why isn't the WSSSE normalized? Because it seems that having smaller values
end to a smaller WSSSE
I'm sure I missed something here!

Florent







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/k-mean-result-interpretation-tp17748.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Returned type of Broadcast variable is byte array

2014-10-30 Thread Stephen Boesch
The byte array turns out to be a serialized ObjectOutputStream that
contains  a Tuple2[ParallelCollectionRDD,Function2].

What then should be done differently in the broadcast code (which follows
the structure of an example taken from mllib)?

assert(crows.isInstanceOf[Array[MVector]])
val bcRows = sc.broadcast(crows)
..

  val arrayVect = bcRows.value



2014-10-30 7:42 GMT-07:00 Stephen Boesch java...@gmail.com:


 As a template for creating a broadcast variable, the following code
 snippet within mllib was used:

 val bcIdf = dataset.context.broadcast(idf)
 dataset.mapPartitions { iter =
   val thisIdf = bcIdf.value


 The new code follows that model:

 import org.apache.spark.mllib.linalg.{Vector = MVector}
   ..
 assert(crows.isInstanceOf[Array[MVector]])
 val bcRows = sc.broadcast(crows)
 val GU = mat.rows.zipWithIndex.mapPartitions { case dataIter =
 val arrayVect = bcRows.value // bcRows.value is seen in
 debugger to be of type Array[Byte] .. ??

 That last line is unhappy:

java.lang.ClassCastException: [B cannot be cast to
 [Lorg.apache.spark.mllib.linalg.Vector;

 So the compiler is aware that the return type of the broadcast value
 method should be an array of vector (which it should). However the actual
 type is Array[Byte].   Any insights on this?




stage failure: java.lang.IllegalStateException: unread block data

2014-10-30 Thread freedafeng
Hi, 

Got this error when running spark 1.1.0 to read Hbase 0.98.1 through simple
python code in a ec2 cluster. The same program runs correctly in local mode.
So this error only happens when running in a real cluster.

Here's what I got,

14/10/30 17:51:53 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID
1, node001, ANY, 1265 bytes)
14/10/30 17:51:53 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on
executor node001: java.lang.IllegalStateException (unread block data)
[duplicate 1]
14/10/30 17:51:53 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID
2, node001, ANY, 1265 bytes)
14/10/30 17:51:53 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on
executor node001: java.lang.IllegalStateException (unread block data)
[duplicate 2]
14/10/30 17:51:53 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID
3, node001, ANY, 1265 bytes)
14/10/30 17:51:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on
executor node001: java.lang.IllegalStateException (unread block data)
[duplicate 3]
14/10/30 17:51:53 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times;
aborting job
14/10/30 17:51:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool 
14/10/30 17:51:53 INFO TaskSchedulerImpl: Cancelling stage 0
14/10/30 17:51:53 INFO DAGScheduler: Failed to run first at
SerDeUtil.scala:70
Traceback (most recent call last):
  File /root/workspace/test/sparkhbase.py, line 22, in module
conf=conf2)
  File /root/spark-1.1.0/python/pyspark/context.py, line 471, in
newAPIHadoopRDD
jconf, batchSize)
  File
/root/spark-1.1.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
  File /root/spark-1.1.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
(TID 3, node001): java.lang.IllegalStateException: unread block data
   
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2399)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1378)
   
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1776)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:679)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at

Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?

2014-10-30 Thread Sameer Farooqui
Hi Shahab,

Are you running Spark in Local, Standalone, YARN or Mesos mode?

If you're running in Standalone/YARN/Mesos, then the .count() action is
indeed automatically parallelized across multiple Executors.

When you run a .count() on an RDD, it is actually distributing tasks to
different executors to each do a local count on a local partition and then
all the tasks send their sub-counts back to the driver for final
aggregation. This sounds like the kind of behavior you're looking for.

However, in Local mode, everything runs in a single JVM (the driver +
executor), so there's no parallelization across Executors.



On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I noticed that the count (of RDD)  in many of my queries is the most
 time consuming one as it runs in the driver process rather then done by
 parallel worker nodes,

 Is there any way to perform count in parallel , at at least parallelize
  it as much as possible?

 best,
 /Shahab



Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?

2014-10-30 Thread Sameer Farooqui
By the way, in case you haven't done so, do try to .cache() the RDD before
running a .count() on it as that could make a big speed improvement.



On Thu, Oct 30, 2014 at 11:12 AM, Sameer Farooqui same...@databricks.com
wrote:

 Hi Shahab,

 Are you running Spark in Local, Standalone, YARN or Mesos mode?

 If you're running in Standalone/YARN/Mesos, then the .count() action is
 indeed automatically parallelized across multiple Executors.

 When you run a .count() on an RDD, it is actually distributing tasks to
 different executors to each do a local count on a local partition and then
 all the tasks send their sub-counts back to the driver for final
 aggregation. This sounds like the kind of behavior you're looking for.

 However, in Local mode, everything runs in a single JVM (the driver +
 executor), so there's no parallelization across Executors.



 On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I noticed that the count (of RDD)  in many of my queries is the most
 time consuming one as it runs in the driver process rather then done by
 parallel worker nodes,

 Is there any way to perform count in parallel , at at least parallelize
  it as much as possible?

 best,
 /Shahab





Re: Ambiguous references to id : what does it mean ?

2014-10-30 Thread Terry Siu
Found this as I am having the same issue. I have exactly the same usage as
shown in Michael's join example. I tried executing a SQL statement against
the join data set with two columns that have the same name and tried to
unambiguate the column name with the table alias, but I would still get an
Unresolved attributes error back. Is there any way around this short of
renaming the columns in the join sources?

Thanks
-Terry


Michael Armbrust wrote
Yes, but if both tagCollection and selectedVideos have a column named id
then Spark SQL does not know which one you are referring to in the where
clause.  Here's an example with aliases:
 val x = testData2.as('x)
 val y = testData2.as('y)
 val join = x.join(y, Inner, Some(x.a.attr === y.a.attr))
On Wed, Jul 16, 2014 at 2:47 AM, Jaonary Rabarisoa lt;

jaonary@

gt;
wrote:
My query is just a simple query that use the spark sql dsl :

tagCollection.join(selectedVideos).where('videoId === 'id)




On Tue, Jul 15, 2014 at 6:03 PM, Yin Huai lt;

huaiyin.thu@

gt; wrote:

Hi Jao,

Seems the SQL analyzer cannot resolve the references in the Join
condition. What is your query? Did you use the Hive Parser (your query
was
submitted through hql(...)) or the basic SQL Parser (your query was
submitted through sql(...)).

Thanks,

Yin


On Tue, Jul 15, 2014 at 8:52 AM, Jaonary Rabarisoa lt;

jaonary@

gt;
wrote:

Hi all,

When running a join operation with Spark SQL I got the following error
:


Exception in thread main
org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Ambiguous
references to id: (id#303,List()),(id#0,List()), tree:
Filter ('videoId = 'id)
  Join Inner, None
   ParquetRelation data/tags.parquet
   Filter (name#1 = P1/cam1)
ParquetRelation data/videos.parquet


What does it mean ?


Cheers,


jao








Re: stage failure: java.lang.IllegalStateException: unread block data

2014-10-30 Thread freedafeng
The worker side has error message as this,

14/10/30 18:29:00 INFO Worker: Asked to launch executor
app-20141030182900-0006/0 for testspark_v1
14/10/30 18:29:01 INFO ExecutorRunner: Launch command: java -cp
::/root/spark-1.1.0/conf:/root/spark-1.1.0/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop2.3.0.jar
-XX:MaxPermSize=128m -Dspark.driver.port=52552 -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkDriver@master:52552/user/CoarseGrainedScheduler 0
node001 4 akka.tcp://sparkWorker@node001:60184/user/Worker
app-20141030182900-0006
14/10/30 18:29:03 INFO Worker: Asked to kill executor
app-20141030182900-0006/0
14/10/30 18:29:03 INFO ExecutorRunner: Runner thread for executor
app-20141030182900-0006/0 interrupted
14/10/30 18:29:03 INFO ExecutorRunner: Killing process!
14/10/30 18:29:03 ERROR FileAppender: Error writing stream to file
/root/spark-1.1.0/work/app-20141030182900-0006/0/stderr
java.io.IOException: Stream Closed
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:214)
at
org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at
org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
14/10/30 18:29:04 INFO Worker: Executor app-20141030182900-0006/0 finished
with state KILLED exitStatus 143
14/10/30 18:29:04 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.180.49.228%3A52120-22#1336571562]
was not delivered. [6] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
14/10/30 18:29:04 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@node001:60184] -
[akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with
[akka.tcp://sparkExecutor@node001:37697]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@node001:37697]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: node001/10.180.49.228:37697
]
14/10/30 18:29:04 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@node001:60184] -
[akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with
[akka.tcp://sparkExecutor@node001:37697]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@node001:37697]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: node001/10.180.49.228:37697
]
14/10/30 18:29:04 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@node001:60184] -
[akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with
[akka.tcp://sparkExecutor@node001:37697]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@node001:37697]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: node001/10.180.49.228:37697
]

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stage-failure-java-lang-IllegalStateException-unread-block-data-tp17751p17755.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



does updateStateByKey accept a state that is a tuple?

2014-10-30 Thread spr
I'm trying to implement a Spark Streaming program to calculate the number of
instances of a given key encountered and the minimum and maximum times at
which it was encountered.  updateStateByKey seems to be just the thing, but
when I define the state to be a tuple, I get compile errors I'm not
finding a way around.  Perhaps it's something simple, but I'm stumped.

var lines =  ssc.textFileStream(dirArg)
var linesArray = lines.map( line = (line.split(\t)))
var newState = linesArray.map( lineArray = (lineArray(4), 1,
   Time((lineArray(0).toDouble*1000).toInt),
   Time((lineArray(0).toDouble*1000).toInt)))

val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state:
Option[(Int, Time, Time)]) = 
{
  val newCount = newValues.map( x = x._1).sum
  val newMinTime = newValues.map( x = x._2).min
  val newMaxTime = newValues.map( x = x._3).max
  val (count, minTime, maxTime) = state.getOrElse((0,
Time(Int.MaxValue), Time(Int.MinValue)))

  Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max))
  //(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max)
}

var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState) 

The error I get is

[info] Compiling 3 Scala sources to
/Users/spr/Documents/.../target/scala-2.10/classes...
[error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value
updateStateByKey is not a member of
org.apache.spark.streaming.dstream.DStream[(String, Int,
org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)]
[error] var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState) 

I don't understand why the String is being prepended to the tuple I expect
(Int, Time, Time).  In the main example (StatefulNetworkWordCount,  here
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
 
), the data is a stream of (String, Int) tuples created by

val wordDstream = words.map(x = (x, 1))

and the updateFunc ignores the String key in its definition

val updateFunc = (values: Seq[Int], state: Option[Int]) = {
  val currentCount = values.sum
  val previousCount = state.getOrElse(0)
  Some(currentCount + previousCount)
}

Is there some special-casing of a key with a simple (non-tuple) value?  How
could this work with a tuple value?  

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks for all your help.
I think I didn't cache the data. My previous cluster was expired and I
don't have a chance to check the load balance or app manager.
Below is my code.
There are 18 features for each record and I am using the Scala API.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import java.util.Calendar

object BenchmarkClassification {
def main(args: Array[String]) {
// Load and parse the data file
val conf = new SparkConf()
  .setAppName(SVM)
  .set(spark.executor.memory, 8g)
  // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
val sc = new SparkContext(conf)
val data = sc.textFile(args(0))
val parsedData = data.map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
x.toDouble)))
}
val testData = sc.textFile(args(1))
val testParsedData = testData .map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
x.toDouble)))
}

// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
// val labelAndPreds = testParsedData.map { point =
//   val prediction = model.predict(point.features)
//   (point.label, prediction)
// }
// val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
testParsedData.count
// println(Training Error =  + trainErr)
println(Calendar.getInstance().getTime())
}
}




Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:

 DId you cache the data and check the load balancing? How many
 features? Which API are you using, Scala, Java, or Python? -Xiangrui

 On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
  Watch the app manager it should tell you what's running and taking
 awhile...
  My guess it's a distinct function on the data.
  J
 
  Sent from my iPhone
 
  On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
 
  Hi,
 
 
 
  Previous we have applied SVM algorithm in MLlib to 5 million records (600
  mb), it takes more than 25 minutes to finish.
  The spark version we are using is 1.0 and we were running this program
 on a
  4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
 
  The 5 million records only have two distinct records (One positive and
 one
  negative), others are all duplications.
 
  Any one has any idea on why it takes so long on this small data?
 
 
 
  Thanks,
  Best,
 
  Peng



Re: Algebird using spark-shell

2014-10-30 Thread Ian O'Connell
Algebird 0.8.0 has 2.11 support if you want to run in a 2.11 env.

On Thu, Oct 30, 2014 at 10:08 AM, Buntu Dev buntu...@gmail.com wrote:

 Thanks.. I was using Scala 2.11.1 and was able to
 use algebird-core_2.10-0.1.11.jar with spark-shell.

 On Thu, Oct 30, 2014 at 8:22 AM, Ian O'Connell i...@ianoconnell.com
 wrote:

 Whats the error with the 2.10 version of algebird?

 On Thu, Oct 30, 2014 at 12:49 AM, thadude ohpre...@yahoo.com wrote:

 I've tried:

 . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar

 scala import com.twitter.algebird._
 import com.twitter.algebird._

 scala  import HyperLogLog._
 import HyperLogLog._

 scala import com.twitter.algebird.HyperLogLogMonoid
 import com.twitter.algebird.HyperLogLogMonoid


 scala val hll = new HyperLogLogMonoid(12)
 hll: com.twitter.algebird.HyperLogLogMonoid =
 com.twitter.algebird.HyperLogLogMonoid@7bde289a


 https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Out of memory with Spark Streaming

2014-10-30 Thread Chris Fregly
curious about why you're only seeing 50 records max per batch.

how many receivers are you running?  what is the rate that you're putting
data onto the stream?

per the default AWS kinesis configuration, the producer can do 1000 PUTs
per second with max 50k bytes per PUT and max 1mb per second per shard.

on the consumer side, you can only do 5 GETs per second and 2mb per second
per shard.

my hunch is that the 5 GETs per second is what's limiting your consumption
rate.

can you verify that these numbers match what you're seeing?  if so, you may
want to increase your shards and therefore the number of kinesis receivers.

otherwise, this may require some further investigation on my part.  i wanna
stay on top of this if it's an issue.

thanks for posting this, aniket!

-chris

On Fri, Sep 12, 2014 at 5:34 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Hi all

 Sorry but this was totally my mistake. In my persistence logic, I was
 creating async http client instance in RDD foreach but was never closing it
 leading to memory leaks.

 Apologies for wasting everyone's time.

 Thanks,
 Aniket

 On 12 September 2014 02:20, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Which version of spark are you running?

 If you are running the latest one, then could try running not a window
 but a simple event count on every 2 second batch, and see if you are still
 running out of memory?

 TD


 On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I did change it to be 1 gb. It still ran out of memory but a little
 later.

 The streaming job isnt handling a lot of data. In every 2 seconds, it
 doesn't get more than 50 records. Each record size is not more than 500
 bytes.
  On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com
 wrote:

 You could set spark.executor.memory to something bigger than the
 default (512mb)


 On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am running a simple Spark Streaming program that pulls in data from
 Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps
 data and persists to a store.

 The program is running in local mode right now and runs out of memory
 after a while. I am yet to investigate heap dumps but I think Spark isn't
 releasing memory after processing is complete. I have even tried changing
 storage level to disk only.

 Help!

 Thanks,
 Aniket







Re: Do Spark executors restrict native heap vs JVM heap?

2014-10-30 Thread Sean Owen
No, but, the JVM also does not allocate memory for native code on the heap.
I dont think heap has any bearing on whether your native code can't
allocate more memory except that of course the heap is also taking memory.
On Oct 30, 2014 6:43 PM, Paul Wais pw...@yelp.com wrote:

 Dear Spark List,

 I have a Spark app that runs native code inside map functions.  I've
 noticed that the native code sometimes sets errno to ENOMEM indicating
 a lack of available memory.  However, I've verified that the /JVM/ has
 plenty of heap space available-- Runtime.getRuntime().freeMemory()
 shows gigabytes free and the native code needs only megabytes.  Does
 spark limit the /native/ heap size somehow?  Am poking through the
 executor code now but don't see anything obvious.

 Best Regards,
 -Paul Wais

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: k-mean - result interpretation

2014-10-30 Thread Sean Owen
Yes that is exactly it. The values are not comparable since normalization
is also shrinking all distances. Squared error is not an absolute metric.

I haven't thought about this much but maybe you are looking for something
like the silhouette coefficient?
On Oct 30, 2014 5:35 PM, mgCl2 florent.jouante...@gmail.com wrote:

 Hello everyone,

 I'm trying to use MLlib's K-mean algorithm.

 I tried it on raw data, Here is a example of a line contained in my input
 data set:
 82.9817 3281.4495

 with those parameters:
 *numClusters*=4
 *numIterations*=20

 results:
 *WSSSE = 6.375371241589461E9*

 Then I normalized my data:
 0.02219046937793337492 0.97780953062206662508
 With the same parameters, result is now:
  *WSSSE= 0.04229916511906393*

 Is it normal that normalization improve my results?
 Why isn't the WSSSE normalized? Because it seems that having smaller values
 end to a smaller WSSSE
 I'm sure I missed something here!

 Florent







 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/k-mean-result-interpretation-tp17748.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




akka connection refused bug, fix?

2014-10-30 Thread freedafeng
Hi, I saw the same issue as this thread,

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-akka-connection-refused-td9864.html

Anyone has a fix for this bug? Please?!

The log info in my worker node is like,

14/10/30 20:15:18 INFO Worker: Asked to kill executor
app-20141030201514-/0
14/10/30 20:15:18 INFO ExecutorRunner: Runner thread for executor
app-20141030201514-/0 interrupted
14/10/30 20:15:18 INFO ExecutorRunner: Killing process!
14/10/30 20:15:18 INFO Worker: Executor app-20141030201514-/0 finished
with state KILLED exitStatus 1
14/10/30 20:15:18 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.180.49.228%3A47087-2#-814958390]
was not delivered. [1] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
14/10/30 20:15:18 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@node001:42816] -
[akka.tcp://sparkExecutor@node001:35811]: Error [Association failed with
[akka.tcp://sparkExecutor@node001:35811]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@node001:35811]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: node001/10.180.49.228:35811
]
14/10/30 20:15:18 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@node001:42816] -
[akka.tcp://sparkExecutor@node001:35811]: Error [Association failed with
[akka.tcp://sparkExecutor@node001:35811]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@node001:35811]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: node001/10.180.49.228:35811
]
14/10/30 20:15:18 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@node001:42816] -
[akka.tcp://sparkExecutor@node001:35811]: Error [Association failed with
[akka.tcp://sparkExecutor@node001:35811]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@node001:35811]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: node001/10.180.49.228:35811




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-connection-refused-bug-fix-tp17764.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Registering custom metrics

2014-10-30 Thread Gerard Maas
vHi,

I've been exploring the metrics exposed by Spark and I'm wondering whether
there's a way to register job-specific metrics that could be exposed
through the existing metrics system.

Would there be an  example somewhere?

BTW, documentation about how the metrics work could be improved. I found
out about the default servlet and the metrics/json/ endpoint on the code. I
could not find any reference to that on the dedicated doc page [1].
Probably something I could contribute if there's nobody on that at the
moment.

-kr, Gerard.

[1]   http://spark.apache.org/docs/1.1.0/monitoring.html#Metrics


Creating a SchemaRDD from RDD of thrift classes

2014-10-30 Thread ankits
I have one job with spark that creates some RDDs of type X and persists them
in memory. The type X is an auto generated Thrift java class (not a case
class though). Now in another job, I want to convert the RDD to a SchemaRDD
using sqlContext.applySchema(). Can I derive a schema from the thrift
definitions to convert RDD[X] to SchemaRDD[X]?









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-SchemaRDD-from-RDD-of-thrift-classes-tp17766.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating a SchemaRDD from RDD of thrift classes

2014-10-30 Thread Michael Armbrust
That should be possible, although I'm not super familiar with thrift.
You'll probably need access to the generated metadata
http://people.apache.org/~thejas/thrift-0.9/javadoc/org/apache/thrift/meta_data/package-frame.html
.

Shameless plug If you find yourself reading a lot of thrift data you
might consider writing a library that goes against the new SQL Data Source
API https://github.com/apache/spark/pull/2475, which is about to be
merged in.  Its essentially applySchema on steroids.

This code for avro is possibly a useful reference:
https://github.com/marmbrus/sql-avro

On Thu, Oct 30, 2014 at 2:13 PM, ankits ankitso...@gmail.com wrote:

 I have one job with spark that creates some RDDs of type X and persists
 them
 in memory. The type X is an auto generated Thrift java class (not a case
 class though). Now in another job, I want to convert the RDD to a SchemaRDD
 using sqlContext.applySchema(). Can I derive a schema from the thrift
 definitions to convert RDD[X] to SchemaRDD[X]?









 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-SchemaRDD-from-RDD-of-thrift-classes-tp17766.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Best way to partition RDD

2014-10-30 Thread shahab
Thanks Helena, very useful comment,
But is ‘spark.cassandra.input.split.size only effective in Cluster not in
Single node?

best,
/Shahab

On Thu, Oct 30, 2014 at 6:26 PM, Helena Edelson helena.edel...@datastax.com
 wrote:

 Shahab,

 Regardless, WRT cassandra and spark when using the spark cassandra
 connector,  ‘spark.cassandra.input.split.size’ passed into the SparkConf
 configures the approx number of Cassandra partitions in a Spark partition
 (default 10).
 No repartitioning should be necessary with what you have below, but I
 don’t know if you are running on one node or a cluster.

 This is a good initial guide:

 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#configuration-options-for-adjusting-reads

 https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L26-L37

 Cheers,
 Helena
 @helenaedelson

 On Oct 30, 2014, at 1:12 PM, Helena Edelson helena.edel...@datastax.com
 wrote:

 Hi Shahab,
 -How many spark/cassandra nodes are in your cluster?
 -What is your deploy topology for spark and cassandra clusters? Are they
 co-located?

 - Helena
 @helenaedelson

 On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote:

 Hi.

 I am running an application in the Spark which first loads data from
 Cassandra and then performs some map/reduce jobs.

 val srdd = sqlContext.sql(select * from mydb.mytable   )
 I noticed that the srdd only has one partition . no matter how big is
 the data loaded form Cassandra.

 So I perform repartition on the RDD , and then I did the map/reduce
 functions.

 But the main problem is that repartition takes so much time (almost 2
 min), which is not acceptable in my use-case. Is there any better way to do
 repartitioning?

 best,
 /Shahab






Re: does updateStateByKey accept a state that is a tuple?

2014-10-30 Thread spr
I think I understand how to deal with this, though I don't have all the code
working yet.  The point is that the V of (K, V) can itself be a tuple.  So
the updateFunc prototype looks something like

val updateDhcpState = (newValues: Seq[Tuple1[(Int, Time, Time)]], state:
Option[Tuple1[(Int, Time, Time)]]) =
Option[Tuple1[(Int, Time, Time)]]
{  ...   }

And I'm wondering whether those Tuple1()s are superfluous.  Film at 11.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how idf is calculated

2014-10-30 Thread Andrejs Abele
Hi,
I'm writing a paper and I need to calculate tf-idf. Whit your help I
managed to get results, I needed, but the problem is that I need to be able
to explain how each number was gotten. So I tried to understand how idf was
calculated and the numbers i get don't correspond to those I should get .

I have 3 documents (each line a document)
a a b c m m
e a c d e e
d j k l m m c

When I calculate tf, I get this
(1048576,[99,100,106,107,108,109],[1.0,1.0,1.0,1.0,1.0,2.0])
(1048576,[97,98,99,109],[2.0,1.0,1.0,2.0])
(1048576,[97,99,100,101],[1.0,1.0,1.0,3.0]

idf is supposedly calculated idf = log((m + 1) / (d(t) + 1))
m -number of documents (3 in my case).
d(t) - in how many documents is term present
a: log(4/3) =0.1249387366
b: log(4/2) =0.3010299957
c: log(4/4) =0
d: log(4/3) =0.1249387366
e: log(4/2) =0.3010299957
l: log(4/2) =0.3010299957
m: log(4/3) =0.1249387366

When I output  idf vector `
idf.idf.toArray.filter(_.(0)).distinct.foreach(println(_)) `
I get :
1.3862943611198906
0.28768207245178085
0.6931471805599453

I understand why there are only 3 numbers, because only 3 are unique :
log(4/2), log(4/3), log(4/4), but I don't understand how numbers in idf
where calculated

Best regards,
Andrejs


RE: how idf is calculated

2014-10-30 Thread Ashic Mahtab
Hi Andrejs,The calculations are a bit different to what I've come across in 
Mining Massive Datasets (2nd Ed. Ullman et. al.,  Cambridge Press) available 
here:http://www.mmds.org/ 
Their calculation of IDF is as follows:
IDFi = log2(N / ni)
where N is the number of documents and ni is the number of documents in which 
the word appears. This looks different to your IDF function.
For TF, they use
TFij = fij / maxk fkj
That is:
For document j, the term frequency of the term i in j is the number of 
times i appears in j divided by the maximum number of times any term appears in 
j. Stop words are usually excluded when considering the maximum).
So, in your case, the 
TFa1 = 2 / 2 = 1
TFb1 = 1 / 2 = 0.5TFc1 = 1/2 = 0.5TFm1 = 2/2 = 1...
IDFa = log2(3 / 2) = 0.585
So, TFa1 * IDFa = 0.585
Wikipedia mentions an adjustment to overcome biases for long documents, by 
calculating TFij = 0.5 + {(0.5*fij)/maxk fkj}, but that doesn't change anything 
for TFa1, as the value remains 1.
In other words, my calculations don't agree with yours, and neither seem to 
agree with Spark :)
Regards,Ashic.
Date: Thu, 30 Oct 2014 22:13:49 +
Subject: how idf is calculated
From: andr...@sindicetech.com
To: u...@spark.incubator.apache.org

Hi,I'm writing a paper and I need to calculate tf-idf. Whit your help I managed 
to get results, I needed, but the problem is that I need to be able to explain 
how each number was gotten. So I tried to understand how idf was calculated and 
the numbers i get don't correspond to those I should get .  
I have 3 documents (each line a document)a a b c m me a c d e ed j k l m m c
When I calculate tf, I get this 
(1048576,[99,100,106,107,108,109],[1.0,1.0,1.0,1.0,1.0,2.0])(1048576,[97,98,99,109],[2.0,1.0,1.0,2.0])(1048576,[97,99,100,101],[1.0,1.0,1.0,3.0]
idf is supposedly calculated idf = log((m + 1) / (d(t) + 1))m -number of 
documents (3 in my case).d(t) - in how many documents is term presenta: 
log(4/3) =0.1249387366b: log(4/2) =0.3010299957c: log(4/4) =0d: log(4/3) 
=0.1249387366e: log(4/2) =0.3010299957l: log(4/2) =0.3010299957m: log(4/3) 
=0.1249387366
When I output  idf vector ` 
idf.idf.toArray.filter(_.(0)).distinct.foreach(println(_)) `I get 
:1.38629436111989060.287682072451780850.6931471805599453
I understand why there are only 3 numbers, because only 3 are unique : 
log(4/2), log(4/3), log(4/4), but I don't understand how numbers in idf where 
calculated 
Best regards,Andrejs 
  

Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread Xiangrui Meng
Then caching should solve the problem. Otherwise, it is just loading
and parsing data from disk for each iteration. -Xiangrui

On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
 Thanks for all your help.
 I think I didn't cache the data. My previous cluster was expired and I don't
 have a chance to check the load balance or app manager.
 Below is my code.
 There are 18 features for each record and I am using the Scala API.

 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 import org.apache.spark.mllib.classification.SVMWithSGD
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.linalg.Vectors
 import java.util.Calendar

 object BenchmarkClassification {
 def main(args: Array[String]) {
 // Load and parse the data file
 val conf = new SparkConf()
  .setAppName(SVM)
  .set(spark.executor.memory, 8g)
  // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
val sc = new SparkContext(conf)
 val data = sc.textFile(args(0))
 val parsedData = data.map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
 x.toDouble)))
 }
 val testData = sc.textFile(args(1))
 val testParsedData = testData .map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
 x.toDouble)))
 }

 // Run training algorithm to build the model
 val numIterations = 20
 val model = SVMWithSGD.train(parsedData, numIterations)

 // Evaluate model on training examples and compute training error
 // val labelAndPreds = testParsedData.map { point =
 //   val prediction = model.predict(point.features)
 //   (point.label, prediction)
 // }
 // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 testParsedData.count
 // println(Training Error =  + trainErr)
 println(Calendar.getInstance().getTime())
 }
 }




 Thanks,
 Best,
 Peng

 On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:

 DId you cache the data and check the load balancing? How many
 features? Which API are you using, Scala, Java, or Python? -Xiangrui

 On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
  Watch the app manager it should tell you what's running and taking
  awhile...
  My guess it's a distinct function on the data.
  J
 
  Sent from my iPhone
 
  On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
 
  Hi,
 
 
 
  Previous we have applied SVM algorithm in MLlib to 5 million records
  (600
  mb), it takes more than 25 minutes to finish.
  The spark version we are using is 1.0 and we were running this program
  on a
  4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
 
  The 5 million records only have two distinct records (One positive and
  one
  negative), others are all duplications.
 
  Any one has any idea on why it takes so long on this small data?
 
 
 
  Thanks,
  Best,
 
  Peng



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL + Hive Cached Table Exception

2014-10-30 Thread Jean-Pascal Billaud
Hi,

While testing SparkSQL on top of our Hive metastore, I am getting
some java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD
table.

Basically, I have a table mtable partitioned by some date field in hive
and below is the scala code I am running in spark-shell:

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
val rdd_mtable = sqlContext.sql(select * from mtable where date=20141028);
rdd_mtable.registerTempTable(rdd_mtable);
sqlContext.cacheTable(rdd_mtable);
sqlContext.sql(select count(*) from rdd_mtable).collect(); -- OK
sqlContext.sql(select count(*) from rdd_mtable).collect(); -- Exception

So the first collect() is working just fine, however running the second
collect() which I expect use the cached RDD throws some
java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of
this email. It seems the columnar traversal is crashing for some reasons.
FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14).

java.lang.ArrayIndexOutOfBoundsException: 14
at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at
org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66)
at
org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87)
at
org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73)
at
org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147)
at
org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
at
org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122)
at
org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at
org.apache.spark.sql.CacheManager$class.useCachedData(CacheManager.scala:119)
at org.apache.spark.sql.SQLContext.useCachedData(SQLContext.scala:49)
at
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:376)
at
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:376)
at
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:377)
at
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:377)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:382)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:380)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:386)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:386)

Thanks,


Re: akka connection refused bug, fix?

2014-10-30 Thread freedafeng
followed this

http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Akka-Error-while-running-Spark-Jobs/td-p/18602

but the problem was not fixed..



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-connection-refused-bug-fix-tp17764p17774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkContext UI

2014-10-30 Thread Stuart Horsman
Hi All,

When I load an RDD with:

data = sc.textFile(somefile)

I don't see the resulting RDD in the SparkContext gui on localhost:4040 in
/storage.

Is there something special I need to do to allow me to view this?  I tried
but scala and python shells but same result.

Thanks

Stuart


Re: SparkContext UI

2014-10-30 Thread Sameer Farooqui
Hey Stuart,

The RDD won't show up under the Storage tab in the UI until it's been
cached. Basically Spark doesn't know what the RDD will look like until it's
cached, b/c up until then the RDD is just on disk (external to Spark). If
you launch some transformations + an action on an RDD that is purely on
disk, then Spark will read it from disk, compute against it and then write
the results back to disk or show you the results at the scala/python
shells. But when you run Spark workloads against purely on disk files, the
RDD won't show up in Spark's Storage UI. Hope that makes sense...

- Sameer

On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com
wrote:

 Hi All,

 When I load an RDD with:

 data = sc.textFile(somefile)

 I don't see the resulting RDD in the SparkContext gui on localhost:4040 in
 /storage.

 Is there something special I need to do to allow me to view this?  I tried
 but scala and python shells but same result.

 Thanks

 Stuart



Re: SparkContext UI

2014-10-30 Thread Stuart Horsman
Sorry too quick to pull the trigger on my original email.  I should have
added that I'm tried using persist() and cache() but no joy.

I'm doing this:

data = sc.textFile(somedata)

data.cache

data.count()

but I still can't see anything in the storage?



On 31 October 2014 10:42, Sameer Farooqui same...@databricks.com wrote:

 Hey Stuart,

 The RDD won't show up under the Storage tab in the UI until it's been
 cached. Basically Spark doesn't know what the RDD will look like until it's
 cached, b/c up until then the RDD is just on disk (external to Spark). If
 you launch some transformations + an action on an RDD that is purely on
 disk, then Spark will read it from disk, compute against it and then write
 the results back to disk or show you the results at the scala/python
 shells. But when you run Spark workloads against purely on disk files, the
 RDD won't show up in Spark's Storage UI. Hope that makes sense...

 - Sameer

 On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com
 wrote:

 Hi All,

 When I load an RDD with:

 data = sc.textFile(somefile)

 I don't see the resulting RDD in the SparkContext gui on localhost:4040
 in /storage.

 Is there something special I need to do to allow me to view this?  I
 tried but scala and python shells but same result.

 Thanks

 Stuart





Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi Xiangrui,

Can you give me some code example about caching, as I am new to Spark.

Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui

 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I
 don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble
 /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng
 
 



Scaladoc

2014-10-30 Thread Alessandro Baretta
How do I build the scaladoc html files from the spark source distribution?

Alex Bareta


Re: SparkContext UI

2014-10-30 Thread Sameer Farooqui
Hi Stuart,

You're close!

Just add a () after the cache, like: data.cache()

...and then run the .count() action on it and you should be good to see it
in the Storage UI!


- Sameer

On Thu, Oct 30, 2014 at 4:50 PM, Stuart Horsman stuart.hors...@gmail.com
wrote:

 Sorry too quick to pull the trigger on my original email.  I should have
 added that I'm tried using persist() and cache() but no joy.

 I'm doing this:

 data = sc.textFile(somedata)

 data.cache

 data.count()

 but I still can't see anything in the storage?



 On 31 October 2014 10:42, Sameer Farooqui same...@databricks.com wrote:

 Hey Stuart,

 The RDD won't show up under the Storage tab in the UI until it's been
 cached. Basically Spark doesn't know what the RDD will look like until it's
 cached, b/c up until then the RDD is just on disk (external to Spark). If
 you launch some transformations + an action on an RDD that is purely on
 disk, then Spark will read it from disk, compute against it and then write
 the results back to disk or show you the results at the scala/python
 shells. But when you run Spark workloads against purely on disk files, the
 RDD won't show up in Spark's Storage UI. Hope that makes sense...

 - Sameer

 On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com
  wrote:

 Hi All,

 When I load an RDD with:

 data = sc.textFile(somefile)

 I don't see the resulting RDD in the SparkContext gui on localhost:4040
 in /storage.

 Is there something special I need to do to allow me to view this?  I
 tried but scala and python shells but same result.

 Thanks

 Stuart






Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread Jimmy
sampleRDD. cache()

Sent from my iPhone

 On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote:
 
 Hi Xiangrui, 
 
 Can you give me some code example about caching, as I am new to Spark.
 
 Thanks,
 Best,
 Peng
 
 On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:
 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui
 
 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I 
  don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng
 
 
 


Confused about class paths in spark 1.1.0

2014-10-30 Thread Shay Seng
Hi,

I've been trying to move up from spark 0.9.2 to 1.1.0.
I'm getting a little confused with the setup for a few different use cases,
grateful for any pointers...

(1) spark-shell + with jars that are only required by the driver
(1a)
I added spark.driver.extraClassPath  /mypath/to.jar to my
spark-defaults.conf
I launched spark-shell with:  ./spark-shell

Here I see on the WebUI that spark.driver.extraClassPath has been set, but
I am NOT able to access any methods in the jar.

(1b)
I removed spark.driver.extraClassPath from my spark-default.conf
I launched spark-shell with  .//spark-shell --driver.class.path
/mypath/to.jar

Again I see that the WebUI spark.driver.extraClassPath has been set.
But this time I am able to access the methods in the jar.

Q: Is spark-shell not considered the driver in this case?  why does using
--driver.class.path on the command line have a different behavior to
setting it in spark-defaults.conf ?


(2) Rather than adding each jar individually, is there a way to use
wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/*
 but with --driver.class.path it seems to require individual files.

tks
Shay


Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks Jimmy.
I will have a try.
Thanks very much for your guys' help.

Best,
Peng

On Thu, Oct 30, 2014 at 8:19 PM, Jimmy ji...@sellpoints.com wrote:

 sampleRDD. cache()

 Sent from my iPhone

 On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote:

 Hi Xiangrui,

 Can you give me some code example about caching, as I am new to Spark.

 Thanks,
 Best,
 Peng

 On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui

 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I
 don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 !=
 r._2).count.toDouble /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com
 wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this
 program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive
 and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng
 
 





Re: Confused about class paths in spark 1.1.0

2014-10-30 Thread Matei Zaharia
Try using --jars instead of the driver-only options; they should work with 
spark-shell too but they may be less tested.

Unfortunately, you do have to specify each JAR separately; you can maybe use a 
shell script to list a directory and get a big list, or set up a project that 
builds all of the dependencies into one assembly JAR.

Matei

 On Oct 30, 2014, at 5:24 PM, Shay Seng s...@urbanengines.com wrote:
 
 Hi,
 
 I've been trying to move up from spark 0.9.2 to 1.1.0. 
 I'm getting a little confused with the setup for a few different use cases, 
 grateful for any pointers...
 
 (1) spark-shell + with jars that are only required by the driver
 (1a) 
 I added spark.driver.extraClassPath  /mypath/to.jar to my 
 spark-defaults.conf
 I launched spark-shell with:  ./spark-shell
 
 Here I see on the WebUI that spark.driver.extraClassPath has been set, but I 
 am NOT able to access any methods in the jar.
 
 (1b)
 I removed spark.driver.extraClassPath from my spark-default.conf
 I launched spark-shell with  .//spark-shell --driver.class.path /mypath/to.jar
 
 Again I see that the WebUI spark.driver.extraClassPath has been set. 
 But this time I am able to access the methods in the jar. 
 
 Q: Is spark-shell not considered the driver in this case?  why does using 
 --driver.class.path on the command line have a different behavior to setting 
 it in spark-defaults.conf ?
  
 
 (2) Rather than adding each jar individually, is there a way to use 
 wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/*  but 
 with --driver.class.path it seems to require individual files.
 
 tks
 Shay


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Confused about class paths in spark 1.1.0

2014-10-30 Thread Shay Seng
-- jars does indeed work but this causes the jars to also get shipped to
the workers -- which I don't want to do for efficiency reasons.

I think you are saying that setting spark.driver.extraClassPath in
spark-default.conf  ought to have the same behavior as providing
--driver.class.apth  to spark-shell. Correct? If so I will file a bug
report since this is definitely not the case.


On Thu, Oct 30, 2014 at 5:39 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Try using --jars instead of the driver-only options; they should work with
 spark-shell too but they may be less tested.

 Unfortunately, you do have to specify each JAR separately; you can maybe
 use a shell script to list a directory and get a big list, or set up a
 project that builds all of the dependencies into one assembly JAR.

 Matei

  On Oct 30, 2014, at 5:24 PM, Shay Seng s...@urbanengines.com wrote:
 
  Hi,
 
  I've been trying to move up from spark 0.9.2 to 1.1.0.
  I'm getting a little confused with the setup for a few different use
 cases, grateful for any pointers...
 
  (1) spark-shell + with jars that are only required by the driver
  (1a)
  I added spark.driver.extraClassPath  /mypath/to.jar to my
 spark-defaults.conf
  I launched spark-shell with:  ./spark-shell
 
  Here I see on the WebUI that spark.driver.extraClassPath has been set,
 but I am NOT able to access any methods in the jar.
 
  (1b)
  I removed spark.driver.extraClassPath from my spark-default.conf
  I launched spark-shell with  .//spark-shell --driver.class.path
 /mypath/to.jar
 
  Again I see that the WebUI spark.driver.extraClassPath has been set.
  But this time I am able to access the methods in the jar.
 
  Q: Is spark-shell not considered the driver in this case?  why does
 using --driver.class.path on the command line have a different behavior to
 setting it in spark-defaults.conf ?
 
 
  (2) Rather than adding each jar individually, is there a way to use
 wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/*
 but with --driver.class.path it seems to require individual files.
 
  tks
  Shay




Re: Confused about class paths in spark 1.1.0

2014-10-30 Thread Matei Zaharia
Yeah, I think you should file this as a bug. The problem is that JARs need to 
also be added into the Scala compiler and REPL class loader, and we probably 
don't do this for the ones in this driver config property.

Matei

 On Oct 30, 2014, at 6:07 PM, Shay Seng s...@urbanengines.com wrote:
 
 -- jars does indeed work but this causes the jars to also get shipped to 
 the workers -- which I don't want to do for efficiency reasons.
 
 I think you are saying that setting spark.driver.extraClassPath in 
 spark-default.conf  ought to have the same behavior as providing 
 --driver.class.apth  to spark-shell. Correct? If so I will file a bug 
 report since this is definitely not the case.
 
 
 On Thu, Oct 30, 2014 at 5:39 PM, Matei Zaharia matei.zaha...@gmail.com 
 mailto:matei.zaha...@gmail.com wrote:
 Try using --jars instead of the driver-only options; they should work with 
 spark-shell too but they may be less tested.
 
 Unfortunately, you do have to specify each JAR separately; you can maybe use 
 a shell script to list a directory and get a big list, or set up a project 
 that builds all of the dependencies into one assembly JAR.
 
 Matei
 
  On Oct 30, 2014, at 5:24 PM, Shay Seng s...@urbanengines.com 
  mailto:s...@urbanengines.com wrote:
 
  Hi,
 
  I've been trying to move up from spark 0.9.2 to 1.1.0.
  I'm getting a little confused with the setup for a few different use cases, 
  grateful for any pointers...
 
  (1) spark-shell + with jars that are only required by the driver
  (1a)
  I added spark.driver.extraClassPath  /mypath/to.jar to my 
  spark-defaults.conf
  I launched spark-shell with:  ./spark-shell
 
  Here I see on the WebUI that spark.driver.extraClassPath has been set, but 
  I am NOT able to access any methods in the jar.
 
  (1b)
  I removed spark.driver.extraClassPath from my spark-default.conf
  I launched spark-shell with  .//spark-shell --driver.class.path 
  /mypath/to.jar
 
  Again I see that the WebUI spark.driver.extraClassPath has been set.
  But this time I am able to access the methods in the jar.
 
  Q: Is spark-shell not considered the driver in this case?  why does using 
  --driver.class.path on the command line have a different behavior to 
  setting it in spark-defaults.conf ?
 
 
  (2) Rather than adding each jar individually, is there a way to use 
  wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/*  
  but with --driver.class.path it seems to require individual files.
 
  tks
  Shay
 
 



Re: [scala-user] Why aggregate is inconsistent?

2014-10-30 Thread Xuefeng Wu
My other question is that Spark why not provide foldLeft:
*def foldLeft[U](zeroValue: U)(op: (U, T) = T): U *but aggregate.
the *def fold(zeroValue: T)(op: (T, T) = T): T* in spark is not
deterministic too.



On Thu, Oct 30, 2014 at 3:50 PM, Jason Zaugg jza...@gmail.com wrote:

 On Thu, Oct 30, 2014 at 5:39 PM, Xuefeng Wu ben...@gmail.com wrote:

 scala import scala.collection.GenSeq
 scala val seq = GenSeq(This, is, an, example)

 scala seq.aggregate(0)(_ + _, _ + _)
 res0: String = 0Thisisanexample

 scala seq.par.aggregate(0)(_ + _, _ + _)
 res1: String = 0This0is0an0example

   /** Aggregates the results of applying an operator to subsequent elements.
*
*  This is a more general form of `fold` and `reduce`. It has similar
*  semantics, but does not require the result to be a supertype of the
*  element type. It traverses the elements in different partitions
*  sequentially, using `seqop` to update the result, and then applies
*  `combop` to results from different partitions. The implementation of
*  this operation may operate on an arbitrary number of collection
*  partitions, so `combop` may be invoked an arbitrary number of times.

 ...
*  @tparam Bthe type of accumulated results
*  @param z the initial value for the accumulated result of the 
 partition - this
*   will typically be the neutral element for the `seqop` 
 operator (e.g.
*   `Nil` for list concatenation or `0` for summation) and 
 may be evaluated
*   more than once
*  @param seqop an operator used to accumulate results within a 
 partition
*  @param combopan associative operator used to combine results from 
 different partitions
*/
   def aggregate[B](z: =B)(seqop: (B, A) = B, combop: (B, B) = B): B

 The contract of aggregate allows for this, if you need deterministic
 results you need to choose z that is a the nuetral element for combop. In
 your example, this would be the empty string.
 ​
 -jason




-- 

~Yours, Xuefeng Wu/吴雪峰  敬上


Re: SparkSQL + Hive Cached Table Exception

2014-10-30 Thread Michael Armbrust
Hmmm, this looks like a bug.  Can you file a JIRA?

On Thu, Oct 30, 2014 at 4:04 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 Hi,

 While testing SparkSQL on top of our Hive metastore, I am getting
 some java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD
 table.

 Basically, I have a table mtable partitioned by some date field in
 hive and below is the scala code I am running in spark-shell:

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
 val rdd_mtable = sqlContext.sql(select * from mtable where
 date=20141028);
 rdd_mtable.registerTempTable(rdd_mtable);
 sqlContext.cacheTable(rdd_mtable);
 sqlContext.sql(select count(*) from rdd_mtable).collect(); -- OK
 sqlContext.sql(select count(*) from rdd_mtable).collect(); -- Exception

 So the first collect() is working just fine, however running the second
 collect() which I expect use the cached RDD throws some
 java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of
 this email. It seems the columnar traversal is crashing for some reasons.
 FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14).

 java.lang.ArrayIndexOutOfBoundsException: 14
 at
 org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
 at
 org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
 at
 org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
 at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89)
 at
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
 at
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66)
 at
 org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87)
 at
 org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73)
 at
 org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147)
 at
 org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
 at
 org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
 at scala.Option.map(Option.scala:145)
 at
 org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122)
 at
 org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
 at
 org.apache.spark.sql.CacheManager$class.useCachedData(CacheManager.scala:119)
 at org.apache.spark.sql.SQLContext.useCachedData(SQLContext.scala:49)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:376)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:376)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:377)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:377)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:382)
 at
 

Re: Use RDD like a Iterator

2014-10-30 Thread Zhan Zhang
RDD.toLocalIterator return the partition one by one but with all elements in 
the partition, which is not lazy calculated. Given the design of spark, it is 
very hard to maintain the state of iterator across runJob.

  def toLocalIterator: Iterator[T] = {
def collectPartition(p: Int): Array[T] = {
  sc.runJob(this, (iter: Iterator[T]) = iter.toArray, Seq(p), allowLocal = 
false).head
}
(0 until partitions.length).iterator.flatMap(i = collectPartition(i))
  }

Thanks.

Zhan Zhang

On Oct 29, 2014, at 3:43 AM, Yanbo Liang yanboha...@gmail.com wrote:

 RDD.toLocalIterator() is the suitable solution.
 But I doubt whether it conform with the design principle of spark and RDD.
 All RDD transform is lazily computed until it end with some actions. 
 
 2014-10-29 15:28 GMT+08:00 Sean Owen so...@cloudera.com:
 Call RDD.toLocalIterator()?
 
 https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html
 
 On Wed, Oct 29, 2014 at 4:15 AM, Dai, Kevin yun...@ebay.com wrote:
  Hi, ALL
 
 
 
  I have a RDD[T], can I use it like a iterator.
 
  That means I can compute every element of this RDD lazily.
 
 
 
  Best Regards,
 
  Kevin.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


SizeEstimator in Spark 1.1 and high load/object allocation when reading in data

2014-10-30 Thread Erik Freed
Hi All,

We have recently moved to Spark 1.1 from 0.9 for an application handling a
fair number of very large datasets partitioned across multiple nodes. About
half of each of these large datasets is stored in off heap byte arrays and
about half in the standard Java heap.

While these datasets are being loaded from our custom HDFS 2.3 RDD and
before we are using even a fraction of the available Java Heap and the
native off heap memory the loading slows to an absolute crawl. It appears
clear from our profiling of the Spark Executor that in the Spark
SizeEstimator an extremely high cpu load is being demanded along with a
fast and furious allocation of Object[] instances.  We do not believe we
were seeing this sort of behavior in 0.9 and we have noticed rather
significant changes in this part of the BlockManager code going from 0.9 to
1.1 and beyond. A GC run gets rid of all of the Object[] instances.

Before we start spending large amounts of time either switching back to 0.9
or further tracing to the root cause of this, I was wondering if anyone out
there had enough experience with that part of the code (or had run into the
same problem) and could help us understand what sort of root causes might
lay behind this strange behavior and even better what we could do to
resolve them.

Any help would be very much appreciated.

cheers,
Erik


Spark Streaming Issue not running 24/7

2014-10-30 Thread sivarani
The problem is simple

I want a to stream data 24/7 do some calculations and save the result in a
csv/json file so that i could use it for visualization using dc.js/d3.js

I opted for spark streaming on yarn cluster with kafka tried running it for
24/7

Using GroupByKey and updateStateByKey to have the computed historical data

Initially streaming is working fine.. but after few hours i am getting

14/10/30 23:48:49 ERROR TaskSetManager: Task 2485162.0:3 failed 4 times;
aborting job
14/10/30 23:48:50 ERROR JobScheduler: Error running job streaming job
141469227 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2485162.0:3 failed 4 times, most recent failure: Exception failure in TID
478548 on host 172.18.152.36: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
I guess its due to the GroupByKey and updateStateByKey, i tried
GroupByKey(100) increased partition

Also when data is in state say for eg 10th sec 1000 records are in state,
100th sec 20,000 records are in state out of which 19,000 records are not
updated how to remove them from state.. UpdateStateByKey(none) how and when
to do that, how we will know when to send none, and save the data before
setting none?

I also tried not sending any data a few hours but check the web ui i am
getting task FINISHED

app-20141030203943- NewApp  0   6.0 GB  2014/10/30 20:39:43 hadoop  
FINISHED
4.2 h

This makes me confused.. In the code it says awaitTermination, but did not
terminate the task.. will streaming stop if no data is received for a
significant amount of time? Is there any doc available on how much time
spark will run when no data is streamed? Any Doc available



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Issue-not-running-24-7-tp17791.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?

2014-10-30 Thread Sonal Goyal
Hey Sameer,

Wouldnt local[x] run count parallelly in each of the x threads?

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Thu, Oct 30, 2014 at 11:42 PM, Sameer Farooqui same...@databricks.com
wrote:

 Hi Shahab,

 Are you running Spark in Local, Standalone, YARN or Mesos mode?

 If you're running in Standalone/YARN/Mesos, then the .count() action is
 indeed automatically parallelized across multiple Executors.

 When you run a .count() on an RDD, it is actually distributing tasks to
 different executors to each do a local count on a local partition and then
 all the tasks send their sub-counts back to the driver for final
 aggregation. This sounds like the kind of behavior you're looking for.

 However, in Local mode, everything runs in a single JVM (the driver +
 executor), so there's no parallelization across Executors.



 On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I noticed that the count (of RDD)  in many of my queries is the most
 time consuming one as it runs in the driver process rather then done by
 parallel worker nodes,

 Is there any way to perform count in parallel , at at least parallelize
  it as much as possible?

 best,
 /Shahab





Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-10-30 Thread Jianshi Huang
Hi Preshant, Chester, Mohammed,

I switched to Spark's Akka and now it works well. Thanks for the help!

(Need to exclude Akka from Spray dependencies, or specify it as provided)


Jianshi


On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com
wrote:

  I am not sure about that.



 Can you try a Spray version built with 2.2.x along with Spark 1.1 and
 include the Akka dependencies in your project’s sbt file?



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 8:58 PM
 *To:* Mohammed Guller
 *Cc:* user
 *Subject:* Re: Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4,
 right?



 Jianshi



 On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Try a version built with Akka 2.2.x



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 3:03 AM
 *To:* user
 *Subject:* Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 Hi,



 I got the following exceptions when using Spray client to write to
 OpenTSDB using its REST API.



   Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;



 It worked locally in my Intellij but failed when I launch it from
 Spark-submit.



 Google suggested it's a compatibility issue in Akka. And I'm using latest
 Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark.



 I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
 2.3.4). Both failed with the same exception.



 Anyone has idea what went wrong? Need help!



 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: use additional ebs volumes for hsdf storage with spark-ec2

2014-10-30 Thread Daniel Mahler
Thanks Akhil. I tried changing /root/ephemeral-hdfs/conf/hdfs-site.xml to
have

  property
namedfs.data.dir/name

value/vol,/vol0,/vol1,/vol2,/vol3,/vol4,/vol5,/vol6,/vol7,/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data/value
  /property

and then running

/root/ephemeral-hdfs/bin/stop-all.sh
copy-dir  /root/ephemeral-hdfs/conf/
/root/ephemeral-hdfs/bin/start-all.sh

to try and make sure the new configurations taks on the entire cluster.
I then ran spark to write to the local hdfs.
It failed after filling the original /mnt* mounted drives,,
without writing anything to the attached /vol* drives.

I also tried completely stopping and restarting the cluster,
but restarting resets /root/ephemeral-hdfs/conf/hdfs-site.xml to the
default state.

thanks
Daniel



On Thu, Oct 30, 2014 at 1:56 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I think you can check in the core-site.xml or hdfs-site.xml file under
 /root/ephemeral-hdfs/etc/hadoop/ where you can see data node dir property
 which will be a comma separated list of volumes.

 Thanks
 Best Regards

 On Thu, Oct 30, 2014 at 5:21 AM, Daniel Mahler dmah...@gmail.com wrote:

 I started my ec2 spark cluster with

 ./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10
 launch mycluster

 I see the additional volumes attached but they do not seem to be set up
 for hdfs.
 How can I check if they are being utilized on all workers,
 and how can I get all workers to utilize the extra volumes for hdfs.
 I do not have experience using hadoop directly, only through spark.

 thanks
 Daniel





Re: NonSerializable Exception in foreachRDD

2014-10-30 Thread Tobias Pfeiffer
Harold,

just mentioning it in case you run into it: If you are in a separate
thread, there are apparently stricter limits to what you can and cannot
serialize:

val someVal
future {
  // be very careful with defining RDD operations using someVal here
  val myLocalVal = someVal
  // use myLocalVal instead
}

On Thu, Oct 30, 2014 at 4:55 PM, Harold Nguyen har...@nexgate.com wrote:

 In Spark Streaming, when I do foreachRDD on my DStreams, I get a
 NonSerializable exception when I try to do something like:

 DStream.foreachRDD( rdd = {
   var sc.parallelize(Seq((test, blah)))
 })


Is this the code you are actually using? var sc.parallelize(...) doesn't
really look like valid Scala to me.

Tobias


Re: Using a Database to persist and load data from

2014-10-30 Thread Yanbo Liang
AFAIK, you can read data from DB with JdbcRDD, but there is no interface
for writing to DB.
JdbcRDD has some restrict such as  SQL must with where clause.
For writing to DB, you can use mapPartitions or foreachPartition to
implement.
You can refer this example:
http://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala

2014-10-30 23:01 GMT+08:00 Asaf Lahav asaf.la...@gmail.com:

 Hi Ladies and Gents,
 I would like to know what are the options I have if I would like to
 leverage Spark code I already have written to use a DB (Vertica) as its
 store/datasource.
 The data is of tabular nature. So any relational DB can essentially be
 used.

 Do I need to develop a context? If yes, how? where can I get a good
 example?


 Thank you,
 Asaf