Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-06 Thread Davies Liu
There is a PR to fix this: https://github.com/apache/spark/pull/1802

On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 I concur that printSchema works; it just seems to be operations that use the
 data where trouble happens.

 Thanks for posting the bug.

 -Brad


 On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote:

 I tried jsonRDD(...).printSchema() and it worked. Seems the problem is
 when we take the data back to the Python side, SchemaRDD#javaToPython failed
 on your cases. I have created
 https://issues.apache.org/jira/browse/SPARK-2875 to track it.

 Thanks,

 Yin


 On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I checked out and built master.  Note that Maven had a problem building
 Kafka (in my case, at least); I was unable to fix this easily so I moved on
 since it seemed unlikely to have any influence on the problem at hand.

 Master improves functionality (including the example Nicholas just
 demonstrated) but unfortunately there still seems to be a bug related to
 using dictionaries as values.  I've put some code below to illustrate the
 bug.

 # dictionary as value works fine
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
  value}}'])).collect()
 [Row(key0=Row(key1=u'value'))]

 # dictionary as value works fine, even when inner keys are varied
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
  '{key0: {key2: value2}}'])).collect()
 [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
 key2=u'value2'))]

 # dictionary as value works fine when inner keys are missing and outer
 key is present
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
  value1}}'])).collect()
 [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

 # dictionary as value FAILS when outer key is missing
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
  value1}}'])).collect()
 Py4JJavaError: An error occurred while calling o84.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage
 7.0 (TID 242, engelland.research.intel-research.net):
 java.lang.NullPointerException...

 # dictionary as value FAILS when outer key is present with null value
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
  {key1: value1}}'])).collect()
 Py4JJavaError: An error occurred while calling o98.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage
 9.0 (TID 305, kunitz.research.intel-research.net):
 java.lang.NullPointerException...

 # nested lists work even when outer key is missing
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0,
  item1], [item2, item3]]}'])).collect()
 [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

 Is anyone able to replicate this behavior?

 -Brad




 On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point
 its likely the same as master, but that will change as features start
 getting added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick



 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine, 
 but
 then it crashes on collect().

 In general, should I expect the master (which seems to be on
 branch-1.1) to be any more/less stable than branch-1.0?  While it would 
 be
 great to have this fixed, it would be good to know if I should expect 
 lots
 of other instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext
  sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3],
  [4,5,6]]}'







 ])
 

Problem reading from S3 in standalone application

2014-08-06 Thread sparkuser2345
Hi, 

I'm running Spark in an EMR cluster and I'm able to read from S3 using REPL
without problems: 

val input_file = s3://bucket-name/test_data.txt
val rawdata = sc.textFile(input_file)  
val test = rawdata.collect

but when I try to run a simple standalone application reading the same data,
I get an error saying that I should provide the access keys: 

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object test {

  def main(args: Array[String]) {

val master =
spark://ec2-xx-xx-xxx-xxx.eu-west-1.compute.amazonaws.com:7077
val sparkHome = /home/hadoop/spark/

val sc = new SparkContext(master, test, sparkHome, Seq())

val input_file = s3://bucket-name/test_data.txt
val rawdata = sc.textFile(input_file)  
val test = rawdata.collect
sc.stop() 
  }
}

[error] (run-main-0) java.lang.IllegalArgumentException: AWS Access Key ID
and Secret Access Key must be specified as the username or password
(respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
fs.s3.awsSecretAccessKey properties (respectively).
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3 URL, or
by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties
(respectively).
at
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy13.initialize(Unknown Source)
at
org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
at org.apache.spark.rdd.RDD.collect(RDD.scala:717)
at test$.main(test.scala:17)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)

When I add the keys to the file name

val input_file = s3://access key:secret access
key@bucket-name/test_data.txt

I get an Input path does not exist error (keys and bucket name changed
from the error message, naturally): 

[error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input
path does not exist: s3://access key:secret access
key@bucket-name/test_data.txt
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
s3://access key:secret access key@bucket-name/test_data.txt
at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
 

Re: Problem reading from S3 in standalone application

2014-08-06 Thread sparkuser2345
I'm getting the same Input path does not exist error also after setting the
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and using
the format s3://bucket-name/test_data.txt  for the input file. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11526.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem reading from S3 in standalone application

2014-08-06 Thread Evan Sparks
Try s3n://

 On Aug 6, 2014, at 12:22 AM, sparkuser2345 hm.spark.u...@gmail.com wrote:
 
 I'm getting the same Input path does not exist error also after setting the
 AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and using
 the format s3://bucket-name/test_data.txt  for the input file. 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11526.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting spark.executor.memory problem

2014-08-06 Thread Grzegorz Białek
Hi Andrew,

Thank you very much for your solution, it works like a charm, and for very
clear explanation.

Grzegorz


fail to run LBFS in 5G KDD data in spark 1.0.1?

2014-08-06 Thread Lizhengbing (bing, BIPA)
1 I don't use spark_submit to run my problem and use spark context directly
val conf = new SparkConf()
 .setMaster(spark://123d101suse11sp3:7077)
 .setAppName(LBFGS)
 .set(spark.executor.memory, 30g)
 .set(spark.akka.frameSize,20)
val sc = new SparkContext(conf)

2 I use KDD data, size is about 5G

3 After I execute LBFGS.runLBFGS, at the stage of 7, the problem occus:

[cid:image001.png@01CFB197.A3BD3D60]

14/08/06 16:44:45 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201
Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 7.0:12 failed 4 times, most recent failure: TID 304 on host 
123d103suse11sp3 failed for unknown reason
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)


Spark GraphX remembering old message

2014-08-06 Thread Arun Kumar
Hi



I am trying to find belief for a graph using GraphX Pragel implementation
.My use case is like if vertex 2,3,4 are sending message m2,m3,m4 to vertex
6 .In vertex 6 I will multiple all the messages (m2*m3*m4) =m6 and then
from vertex6 the message (m6/m2) will be send to vertex 2,m6/m3 to vertex
3,and m6/m4 to vertex 4.



The pragel mearge() method is used to find m6( (m2*m3*m4) =m6) after that
for sending message m6/m2 to vertex 2 in send message ,the old message from
m2 or m3 or m4 is not available so I cannot find m6/m2,m6/m3,m6/m4

Can somebody guide me on how to resolve this using GraphX pragel API?



Thanks

Arun


can't submit my application on standalone spark cluster

2014-08-06 Thread Andres Gomez Ferrer
Hi all,

My name is Andres and I'm starting to use Apache Spark.

I try to submit my spark.jar to my cluster using this:

spark-submit --class net.redborder.spark.RedBorderApplication --master 
spark://pablo02:7077 redborder-spark-selfcontained.jar

But when I did it .. My worker die .. and my driver too!

This is my driver log:

[INFO] 2014-08-06 06:30:12,025 [Driver-akka.actor.default-dispatcher-3]  
akka.event.slf4j.Slf4jLogger applyOrElse - Slf4jLogger started
[INFO] 2014-08-06 06:30:12,061 [Driver-akka.actor.default-dispatcher-3]  
Remoting apply$mcV$sp - Starting remoting
[ERROR] 2014-08-06 06:30:12,089 [Driver-akka.actor.default-dispatcher-6]  
akka.actor.ActorSystemImpl apply$mcV$sp - Uncaught fatal error from thread 
[Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver]
java.lang.VerifyError: (class: 
org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker 
signature: 
(Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;)
 Wrong return type in function
at 
akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)
at 
akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown 
Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:161)
at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:200)
at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at 
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
at 
akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] 2014-08-06 06:30:12,093 [Driver-akka.actor.default-dispatcher-5]  
akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Shutting 
down remote daemon.
[INFO] 2014-08-06 06:30:12,095 [Driver-akka.actor.default-dispatcher-5]  
akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remote 
daemon shut down; proceeding with flushing remote transports.
[INFO] 2014-08-06 06:30:12,102 [Driver-akka.actor.default-dispatcher-3]  
Remoting apply$mcV$sp - Remoting shut down
[INFO] 2014-08-06 06:30:12,104 [Driver-akka.actor.default-dispatcher-3]  
akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remoting 
shut down.
[ERROR] [08/06/2014 06:30:22.065] [main] [Remoting] Remoting error: [Startup 
timed out] [
akka.remote.RemoteTransportException: Startup timed out
at 
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
at akka.remote.Remoting.start(Remoting.scala:191)
at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
  

Re: can't submit my application on standalone spark cluster

2014-08-06 Thread Akhil Das
Looks like a netty conflict there, most likely you are having mutiple
versions of netty jars (eg:
netty-3.6.6.Final.jar, netty-3.2.2.Final.jar, netty-all-4.0.13.Final.jar),
you only require 3.6.6 i believe. a quick fix would be to remove the rest
of them.

Thanks
Best Regards


On Wed, Aug 6, 2014 at 3:05 PM, Andres Gomez Ferrer ago...@redborder.net
wrote:

 Hi all,

 My name is Andres and I'm starting to use Apache Spark.

 I try to submit my spark.jar to my cluster using this:

 spark-submit --class net.redborder.spark.RedBorderApplication --master
 spark://pablo02:7077 redborder-spark-selfcontained.jar

 But when I did it .. My worker die .. and my driver too!

 This is my driver log:

 [INFO] 2014-08-06 06:30:12,025 [Driver-akka.actor.default-dispatcher-3]
  akka.event.slf4j.Slf4jLogger applyOrElse - Slf4jLogger started
 [INFO] 2014-08-06 06:30:12,061 [Driver-akka.actor.default-dispatcher-3]
  Remoting apply$mcV$sp - Starting remoting
 [ERROR] 2014-08-06 06:30:12,089 [Driver-akka.actor.default-dispatcher-6]
  akka.actor.ActorSystemImpl apply$mcV$sp - Uncaught fatal error from thread
 [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver]
 java.lang.VerifyError: (class:
 org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker
 signature:
 (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;)
 Wrong return type in function
 at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)
 at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
 Source)
 at java.lang.reflect.Constructor.newInstance(Unknown Source)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
 at scala.util.Try$.apply(Try.scala:161)
 at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
 at scala.util.Success.flatMap(Try.scala:200)
 at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
 at
 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
 at
 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [INFO] 2014-08-06 06:30:12,093 [Driver-akka.actor.default-dispatcher-5]
  akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp -
 Shutting down remote daemon.
 [INFO] 2014-08-06 06:30:12,095 [Driver-akka.actor.default-dispatcher-5]
  akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp -
 Remote daemon shut down; proceeding with flushing remote transports.
 [INFO] 2014-08-06 06:30:12,102 [Driver-akka.actor.default-dispatcher-3]
  Remoting apply$mcV$sp - Remoting shut down
 [INFO] 2014-08-06 06:30:12,104 [Driver-akka.actor.default-dispatcher-3]
  akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp -
 Remoting shut down.
 [ERROR] [08/06/2014 06:30:22.065] [main] [Remoting] Remoting error:
 [Startup timed out] [
 akka.remote.RemoteTransportException: Startup timed out
 at
 akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
 at akka.remote.Remoting.start(Remoting.scala:191)
 at
 akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
 at 

Re: Problem reading from S3 in standalone application

2014-08-06 Thread sparkuser2345
Evan R. Sparks wrote
 Try s3n://

Thanks, that works! In REPL, I can succesfully load the data using both
s3:// and s3n://, why the difference? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL (version 1.1.0-SNAPSHOT) should allow SELECT with duplicated columns

2014-08-06 Thread Jianshi Huang
Spark reported error java.lang.IllegalArgumentException with messages:

java.lang.IllegalArgumentException: requirement failed: Found fields with
the same name.
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.sql.catalyst.types.StructType.init(dataTypes.scala:317)
at
org.apache.spark.sql.catalyst.types.StructType$.fromAttributes(dataTypes.scala:310)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertToString(ParquetTypes.scala:306)
at
org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:83)
at
org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:433)

After trial and error, it seems it's caused by duplicated columns in my
select clause.

I made the duplication on purpose for my code to parse correctly. I think
we should allow users to specify duplicated columns as return value.


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-06 Thread Mahebub Sayyed
Hello,

I have referred link https://github.com/dibbhatt/kafka-spark-consumer; and
I have successfully consumed tuples from kafka.
Tuples are JSON objects and I want to store that objects in HDFS as parque
format.

Please suggest me any sample example for that.
Thanks in advance.





On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 You can try this Kafka Spark Consumer which I recently wrote. This uses
 the Low Level Kafka Consumer

 https://github.com/dibbhatt/kafka-spark-consumer

 Dibyendu




 On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote:

 Hi,

 I am new to Apache Spark and Trying to Develop spark streaming program
 to  *stream data from kafka topics and output as parquet file on HDFS*.

 Please share the *sample reference* program to stream data from kafka
 topics and output as parquet file on HDFS.

 Thanks in Advance.

 Regards,

 Rafeeq S
 *(“What you do is what matters, not what you think or say or plan.” )*





-- 
*Regards,*
*Mahebub Sayyed*


Re: Save an RDD to a SQL Database

2014-08-06 Thread Ron Gonzalez
Hi Vida,
  It's possible to save an RDD as a hadoop file using hadoop output formats. It 
might be worthwhile to investigate using DBOutputFormat and see if this will 
work for you.
  I haven't personally written to a db, but I'd imagine this would be one way 
to do it.

Thanks,
Ron

Sent from my iPhone

 On Aug 5, 2014, at 8:29 PM, Vida Ha vid...@gmail.com wrote:
 
 
 Hi,
 
 I would like to save an RDD to a SQL database.  It seems like this would be a 
 common enough use case.  Are there any built in libraries to do it?
 
 Otherwise, I'm just planning on mapping my RDD, and having that call a method 
 to write to the database.   Given that a lot of records are going to be 
 written, the code would need to be smart and do a batch insert after enough 
 records have collected.  Does that sound like a reasonable approach?
 
 
 -Vida
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem reading from S3 in standalone application

2014-08-06 Thread Nicholas Chammas
See here: https://wiki.apache.org/hadoop/AmazonS3

s3:// refers to a block storage system and is deprecated
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-file-systems.html.
Use s3n:// for regular files you can see in the S3 web console.

Nick



On Wed, Aug 6, 2014 at 6:43 AM, sparkuser2345 hm.spark.u...@gmail.com
wrote:

 Evan R. Sparks wrote
  Try s3n://

 Thanks, that works! In REPL, I can succesfully load the data using both
 s3:// and s3n://, why the difference?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11537.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[Streaming] updateStateByKey trouble

2014-08-06 Thread Yana Kadiyska
Hi folks, hoping someone who works with Streaming can help me out.

I have the following snippet:

val stateDstream =
  data.map(x = (x, 1))
  .updateStateByKey[State](updateFunc)

stateDstream.saveAsTextFiles(checkpointDirectory, partitions_test)

where data is a RDD of

case class StateKey(host:String,hour:String,customer:String)

when I dump out the stream, I see duplicate values in the same partition
(I've bolded the keys that are identical):

(StateKey(foo.com.br,2014-07-22-18,16),State(43,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(2564,2014-08-06T14:05:29.831Z))
(StateKey(bar.com,2014-07-04-20,29),State(77,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(1117,2014-08-06T14:05:29.831Z))


I was under the impression that on each batch, the stream will contain a
single RDD with Key-Value pairs, reflecting the latest state of each key.
Am I misunderstanding this? Or is the key equality somehow failing?

Any tips on this appreciated...

PS. For completeness State is
case class State(val count:Integer,val update_date:DateTime)


Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-06 Thread Nicholas Chammas
Nice catch Brad and thanks to Yin and Davies for getting on it so quickly.


On Wed, Aug 6, 2014 at 2:45 AM, Davies Liu dav...@databricks.com wrote:

 There is a PR to fix this: https://github.com/apache/spark/pull/1802

 On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  I concur that printSchema works; it just seems to be operations that use
 the
  data where trouble happens.
 
  Thanks for posting the bug.
 
  -Brad
 
 
  On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote:
 
  I tried jsonRDD(...).printSchema() and it worked. Seems the problem is
  when we take the data back to the Python side, SchemaRDD#javaToPython
 failed
  on your cases. I have created
  https://issues.apache.org/jira/browse/SPARK-2875 to track it.
 
  Thanks,
 
  Yin
 
 
  On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 
  wrote:
 
  Hi All,
 
  I checked out and built master.  Note that Maven had a problem building
  Kafka (in my case, at least); I was unable to fix this easily so I
 moved on
  since it seemed unlikely to have any influence on the problem at hand.
 
  Master improves functionality (including the example Nicholas just
  demonstrated) but unfortunately there still seems to be a bug related
 to
  using dictionaries as values.  I've put some code below to illustrate
 the
  bug.
 
  # dictionary as value works fine
   print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
   value}}'])).collect()
  [Row(key0=Row(key1=u'value'))]
 
  # dictionary as value works fine, even when inner keys are varied
   print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
   '{key0: {key2: value2}}'])).collect()
  [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
  key2=u'value2'))]
 
  # dictionary as value works fine when inner keys are missing and outer
  key is present
   print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0:
 {key1:
   value1}}'])).collect()
  [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]
 
  # dictionary as value FAILS when outer key is missing
   print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
   value1}}'])).collect()
  Py4JJavaError: An error occurred while calling o84.collect.
  : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task
  14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage
  7.0 (TID 242, engelland.research.intel-research.net):
  java.lang.NullPointerException...
 
  # dictionary as value FAILS when outer key is present with null value
   print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
   {key1: value1}}'])).collect()
  Py4JJavaError: An error occurred while calling o98.collect.
  : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task
  14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage
  9.0 (TID 305, kunitz.research.intel-research.net):
  java.lang.NullPointerException...
 
  # nested lists work even when outer key is missing
   print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0,
   item1], [item2, item3]]}'])).collect()
  [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2',
 u'item3']])]
 
  Is anyone able to replicate this behavior?
 
  -Brad
 
 
 
 
  On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust 
 mich...@databricks.com
  wrote:
 
  We try to keep master very stable, but this is where active
 development
  happens. YMMV, but a lot of people do run very close to master without
  incident (myself included).
 
  branch-1.0 has been cut for a while and we only merge bug fixes into
 it
  (this is more strict for non-alpha components like spark core.).  For
 Spark
  SQL, this branch is pretty far behind as the project is very young
 and we
  are fixing bugs / adding features very rapidly compared with Spark
 core.
 
  branch-1.1 was just cut and is being QAed for a release, at this point
  its likely the same as master, but that will change as features start
  getting added to master in the coming weeks.
 
 
 
  On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
 
  collect() works, too.
 
   sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
   '{foo:[[1,2,3], [4,5,6]]}'])).collect()
  [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]
 
  Can’t answer your question about branch stability, though. Spark is a
  very active project, so stuff is happening all the time.
 
  Nick
 
 
 
  On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
 
  Hi Nick,
 
  Can you check that the call to collect() works as well as
  printSchema()?  I actually experience that printSchema() works
 fine, but
  then it crashes on collect().
 
  In general, should I expect the master (which seems to be on
  branch-1.1) to be any more/less stable than branch-1.0?  While it
 would be
  great to have this fixed, it would be good to know if I should
 expect lots
  of other instability.
 
  best,
  

Re: fail to run LBFS in 5G KDD data in spark 1.0.1?

2014-08-06 Thread Xiangrui Meng
Do you mind testing 1.1-SNAPSHOT and allocating more memory to the driver?
I think the problem is with the feature dimension. KDD data has more than
20M features and in v1.0.1, the driver collects the partial gradients one
by one, sums them up, does the update, and then sends the new weights back
to executors one by one. In 1.1-SNAPSHOT, we switched to multi-level tree
aggregation and torrent broadcasting.

For the driver memory, you can set it with spark-summit using
`--driver-memory 30g`. It could be confirmed by visiting the storage tab in
the WebUI.

-Xiangrui


On Wed, Aug 6, 2014 at 1:58 AM, Lizhengbing (bing, BIPA) 
zhengbing...@huawei.com wrote:

  1 I don’t use spark_submit to run my problem and use spark context
 directly

 val conf = new SparkConf()
  .setMaster(spark://123d101suse11sp3:7077)
  .setAppName(LBFGS)
  .set(spark.executor.memory, 30g)
  .set(spark.akka.frameSize,20)
 val sc = new SparkContext(conf)



 2 I use KDD data, size is about 5G



 3 After I execute LBFGS.runLBFGS, at the stage of 7, the problem occus:





 14/08/06 16:44:45 INFO DAGScheduler: Failed to run aggregate at
 LBFGS.scala:201

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 7.0:12 failed 4 times, most recent failure: TID
 304 on host 123d103suse11sp3 failed for unknown reason

 Driver stacktrace:

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

 at akka.actor.ActorCell.invoke(ActorCell.scala:456)

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

 at akka.dispatch.Mailbox.run(Mailbox.scala:219)

 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)



Re: Save an RDD to a SQL Database

2014-08-06 Thread Yana
Hi Vida,

I am writing to a DB -- or trying to :).

I believe the best practice for this (you can search the mailing list
archives) is to do a combination of mapPartitions and use a grouped
iterator.
Look at this thread, esp. the comment from A. Boisvert and Matei's comment
above it:
https://groups.google.com/forum/#!topic/spark-users/LUb7ZysYp2k

Basically the short story is that you want to open as few connections as
possible but write more than 1 insert at a time.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Save-an-RDD-to-a-SQL-Database-tp11516p11549.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark, numpy arrays and binary data

2014-08-06 Thread Rok Roskar
Hello,

I'm interested in getting started with Spark to scale our scientific analysis 
package (http://pynbody.github.io) to larger data sets. The package is written 
in Python and makes heavy use of numpy/scipy and related frameworks. I've got a 
couple of questions that I have not been able to find easy answers to despite 
some research efforts... I hope someone here can clarify things for me a bit!

* is there a preferred way to read binary data off a local disk directly into 
an RDD? Our I/O routines are built to read data in chunks and each chunk could 
be read by a different process/RDD, but it's not clear to me how to accomplish 
this with the existing API. Since the idea is to process data sets that don't 
fit into a single node's memory, reading first and then distributing via 
sc.parallelize is obviously not an option. 

* related to the first question -- when an RDD is created by parallelizing a 
numpy array, the array gets serialized and distributed. I see in the source 
that it actually gets written into a file first (!?) -- but surely the Py4J 
bottleneck for python array types (mentioned in the source comment) doesn't 
really apply to numpy arrays? Is it really necessary to dump the data onto disk 
first? Conversely, the collect() seems really slow and I suspect that this is 
due to the combination of disk I/O and python list creation. Are there any ways 
of getting around this if numpy arrays are being used? 


I'd be curious about any other best-practices tips anyone might have for 
running pyspark with numpy data...! 

Thanks!


Rok



Regarding tooling/performance vs RedShift

2014-08-06 Thread Gary Malouf
My company is leaning towards moving much of their analytics work from our
own Spark/Mesos/HDFS/Cassandra set up to RedShift.  To date, I have been
the internal advocate for using Spark for analytics, but a number of good
points have been brought up to me.  The reasons being pushed are:

- RedShift exposes a jdbc interface out of the box (no devops work there)
and data looks and feels like it is in a normal sql database.  They want
this out of the box from Spark, no trying to figure out which version
matches this version of Hive/Shark/SparkSQL etc.  Yes, the next release
theoretically supports this but there have been release issues our team has
battled to date that erode the trust.

- Complaints around challenges we have faced running a spark shell locally
against a cluster in EC2.  It is partly a devops issue of deploying the
correct configurations to local machines, being able to kick a user off
hogging RAM, etc.

- I want to be able to run queries from my python shell against your
sequence file data, roll it up and in the same shell leverage python graph
tools.  - I'm not very familiar with the Python setup, but I believe by
being able to run locally AND somehow add custom libraries to be accessed
from PySpark this could be done.

- Joins will perform much better (in RedShift) because it says it sorts
it's keys.  We cannot pre-compute all joins away.


Basically, their argument is two-fold:

1) We get tooling out of the box from RedShift (specifically, stable JDBC
access) - Spark we often are waiting for devops to get the right combo of
tools working or for libraries to support sequence files.

2) There is a belief that for many of our queries (assumed to often be
joins) a columnar database will perform orders of magnitude better.



Anyway, a test is being setup to compare the two on the performance side
but from a tools perspective it's hard to counter the issues that are
brought up.


Re: Spark shell creating a local SparkContext instead of connecting to connecting to Spark Master

2014-08-06 Thread Aniket Bhatnagar
Thanks. This worked :). I am thinking I should add this in spark-env.sh so
that spark-shell always connects to master be default.
On Aug 6, 2014 12:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 ​You can always start your spark-shell by specifying the master as

 MASTER=spark://*whatever*:7077 $SPARK_HOME/bin/spark-shell​

 Then it will connect to that *whatever* master.


 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 8:51 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi

 Apologies if this is a noob question. I have setup Spark 1.0.1 on EMR
 using a slightly modified version of script
 @ s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark-yarn.rb. It
 seems to be running fine with master logs stating:

 14/08/05 14:36:56 INFO Master: I have been elected leader! New state:
 ALIVE
 14/08/05 14:37:21 INFO Master: Registering worker
 ip-10-0-2-80.ec2.internal:52029 with 2 cores, 6.3 GB RAM

 The script has also created spark-env.sh under conf which has the
 following content:

 export SPARK_MASTER_IP=x.x.x.x
 export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3
 export SPARK_LOCAL_DIRS=/mnt/spark/
 export
 SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar
 export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCTimeStamps
 export
 SPARK_ASSEMBLY_JAR=/home/hadoop/spark/lib/spark-assembly-1.0.1-hadoop2.4.0.jar

 However, when I run the spark-shell, sc.isLocal returns true. Also, no
 matter how many RDDs I cache, the used memory in the master UI
 (x.x.x.x:7077) shows 0B used. This leads me to believe that the spark-shell
 isn't connecting to Spark master and has started a local instance of spark.
 Is there something I am missing in my setup that allows for spark-shell to
 connect to master?

 Thanks,
 Aniket





Re: Regarding tooling/performance vs RedShift

2014-08-06 Thread Nicholas Chammas

 1) We get tooling out of the box from RedShift (specifically, stable JDBC
 access) - Spark we often are waiting for devops to get the right combo of
 tools working or for libraries to support sequence files.


The arguments about JDBC access and simpler setup definitely make sense. My
first non-trivial Spark application was actually an ETL process that sliced
and diced JSON + tabular data and then loaded it into Redshift. From there
on you got all the benefits of your average C-store database, plus the
added benefit of Amazon managing many annoying setup and admin details for
your Redshift cluster.

One area I'm looking forward to seeing Spark SQL excel at is offering fast
JDBC access to raw data--i.e. directly against S3 / HDFS; no ETL
required. For easy and flexible data exploration, I don't think you can
beat that with a C-store that you have to ETL stuff into.

2) There is a belief that for many of our queries (assumed to often be
 joins) a columnar database will perform orders of magnitude better.


This is definitely a it depends statement, but there is a detailed
benchmark here https://amplab.cs.berkeley.edu/benchmark/ comparing Shark,
Redshift, and other systems. Have you seen it? Redshift does very well, but
Shark is on par or better than it in most of the tests. Of course, going
forward we'll want to see Spark SQL match this kind of performance, and
that remains to be seen.

Nick



On Wed, Aug 6, 2014 at 12:06 PM, Gary Malouf malouf.g...@gmail.com wrote:

 My company is leaning towards moving much of their analytics work from our
 own Spark/Mesos/HDFS/Cassandra set up to RedShift.  To date, I have been
 the internal advocate for using Spark for analytics, but a number of good
 points have been brought up to me.  The reasons being pushed are:

 - RedShift exposes a jdbc interface out of the box (no devops work there)
 and data looks and feels like it is in a normal sql database.  They want
 this out of the box from Spark, no trying to figure out which version
 matches this version of Hive/Shark/SparkSQL etc.  Yes, the next release
 theoretically supports this but there have been release issues our team has
 battled to date that erode the trust.

 - Complaints around challenges we have faced running a spark shell locally
 against a cluster in EC2.  It is partly a devops issue of deploying the
 correct configurations to local machines, being able to kick a user off
 hogging RAM, etc.

 - I want to be able to run queries from my python shell against your
 sequence file data, roll it up and in the same shell leverage python graph
 tools.  - I'm not very familiar with the Python setup, but I believe by
 being able to run locally AND somehow add custom libraries to be accessed
 from PySpark this could be done.

 - Joins will perform much better (in RedShift) because it says it sorts
 it's keys.  We cannot pre-compute all joins away.


 Basically, their argument is two-fold:

 1) We get tooling out of the box from RedShift (specifically, stable JDBC
 access) - Spark we often are waiting for devops to get the right combo of
 tools working or for libraries to support sequence files.

 2) There is a belief that for many of our queries (assumed to often be
 joins) a columnar database will perform orders of magnitude better.



 Anyway, a test is being setup to compare the two on the performance side
 but from a tools perspective it's hard to counter the issues that are
 brought up.



Re: PySpark, numpy arrays and binary data

2014-08-06 Thread Davies Liu
numpy array only can support basic types, so we can not use it during collect()
by default.

Could you give a short example about how numpy array is used in your project?

On Wed, Aug 6, 2014 at 8:41 AM, Rok Roskar rokros...@gmail.com wrote:
 Hello,

 I'm interested in getting started with Spark to scale our scientific
 analysis package (http://pynbody.github.io) to larger data sets. The package
 is written in Python and makes heavy use of numpy/scipy and related
 frameworks. I've got a couple of questions that I have not been able to find
 easy answers to despite some research efforts... I hope someone here can
 clarify things for me a bit!

 * is there a preferred way to read binary data off a local disk directly
 into an RDD? Our I/O routines are built to read data in chunks and each
 chunk could be read by a different process/RDD, but it's not clear to me how
 to accomplish this with the existing API. Since the idea is to process data
 sets that don't fit into a single node's memory, reading first and then
 distributing via sc.parallelize is obviously not an option.

If you already know how to partition the data, then you could use
sc.parallelize()
to distribute the description of your data, then read the data in parallel by
given descriptions.

For examples, you can partition your data into (path, start, length), then

partitions = [(path1, start1, length), (path1, start2, length), ...]

def read_chunk(path, start, length):
  f = open(path)
  f.seek(start)
  data = f.read(length)
  #processing the data

rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)

 * related to the first question -- when an RDD is created by parallelizing a
 numpy array, the array gets serialized and distributed. I see in the source
 that it actually gets written into a file first (!?) -- but surely the Py4J
 bottleneck for python array types (mentioned in the source comment) doesn't
 really apply to numpy arrays? Is it really necessary to dump the data onto
 disk first? Conversely, the collect() seems really slow and I suspect that
 this is due to the combination of disk I/O and python list creation. Are
 there any ways of getting around this if numpy arrays are being used?


 I'd be curious about any other best-practices tips anyone might have for
 running pyspark with numpy data...!

 Thanks!


 Rok


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming fails - where is the problem?

2014-08-06 Thread durin
Update: I can get it to work by disabling iptables temporarily. I can,
however, not figure out on which port I have to accept traffic. 4040 and any
of the Master or Worker ports mentioned in the previous post don't work.

Can it be one of the randomly assigned ones in the 30k to 60k range? Those
appear to change every time, making it difficult to apply any sensible
rules.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Regarding tooling/performance vs RedShift

2014-08-06 Thread Daniel, Ronald (ELS-SDG)
Just to point out that the benchmark you point to has Redshift running on HDD 
machines instead of SSD, and it is still faster than Shark in all but one case.

Like Gary, I'm also interested in replacing something we have on Redshift with 
Spark SQL, as it will give me much greater capability to process things. I'm 
willing to sacrifice some performance for the greater capability. But it would 
be nice to see the benchmark updated with Spark SQL, and with a more 
competitive configuration of Redshift.

Best regards, and keep up the great work!

Ron


From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com]
Sent: Wednesday, August 06, 2014 9:30 AM
To: Gary Malouf
Cc: user
Subject: Re: Regarding tooling/performance vs RedShift

1) We get tooling out of the box from RedShift (specifically, stable JDBC 
access) - Spark we often are waiting for devops to get the right combo of tools 
working or for libraries to support sequence files.

The arguments about JDBC access and simpler setup definitely make sense. My 
first non-trivial Spark application was actually an ETL process that sliced and 
diced JSON + tabular data and then loaded it into Redshift. From there on you 
got all the benefits of your average C-store database, plus the added benefit 
of Amazon managing many annoying setup and admin details for your Redshift 
cluster.

One area I'm looking forward to seeing Spark SQL excel at is offering fast JDBC 
access to raw data--i.e. directly against S3 / HDFS; no ETL required. For 
easy and flexible data exploration, I don't think you can beat that with a 
C-store that you have to ETL stuff into.

2) There is a belief that for many of our queries (assumed to often be joins) a 
columnar database will perform orders of magnitude better.

This is definitely a it depends statement, but there is a detailed benchmark 
herehttps://amplab.cs.berkeley.edu/benchmark/ comparing Shark, Redshift, and 
other systems. Have you seen it? Redshift does very well, but Shark is on par 
or better than it in most of the tests. Of course, going forward we'll want to 
see Spark SQL match this kind of performance, and that remains to be seen.

Nick


On Wed, Aug 6, 2014 at 12:06 PM, Gary Malouf 
malouf.g...@gmail.commailto:malouf.g...@gmail.com wrote:
My company is leaning towards moving much of their analytics work from our own 
Spark/Mesos/HDFS/Cassandra set up to RedShift.  To date, I have been the 
internal advocate for using Spark for analytics, but a number of good points 
have been brought up to me.  The reasons being pushed are:

- RedShift exposes a jdbc interface out of the box (no devops work there) and 
data looks and feels like it is in a normal sql database.  They want this out 
of the box from Spark, no trying to figure out which version matches this 
version of Hive/Shark/SparkSQL etc.  Yes, the next release theoretically 
supports this but there have been release issues our team has battled to date 
that erode the trust.

- Complaints around challenges we have faced running a spark shell locally 
against a cluster in EC2.  It is partly a devops issue of deploying the correct 
configurations to local machines, being able to kick a user off hogging RAM, 
etc.

- I want to be able to run queries from my python shell against your sequence 
file data, roll it up and in the same shell leverage python graph tools.  - 
I'm not very familiar with the Python setup, but I believe by being able to run 
locally AND somehow add custom libraries to be accessed from PySpark this could 
be done.

- Joins will perform much better (in RedShift) because it says it sorts it's 
keys.  We cannot pre-compute all joins away.


Basically, their argument is two-fold:

1) We get tooling out of the box from RedShift (specifically, stable JDBC 
access) - Spark we often are waiting for devops to get the right combo of tools 
working or for libraries to support sequence files.

2) There is a belief that for many of our queries (assumed to often be joins) a 
columnar database will perform orders of magnitude better.



Anyway, a test is being setup to compare the two on the performance side but 
from a tools perspective it's hard to counter the issues that are brought up.



Re: can't submit my application on standalone spark cluster

2014-08-06 Thread Andrew Or
Hi Andres,

If you're using the EC2 scripts to start your standalone cluster, you can
use ~/spark-ec2/copy-dir --delete ~/spark to sync your jars across the
cluster. Note that you will need to restart the Master and the Workers
afterwards through sbin/start-all.sh and sbin/stop-all.sh. If you're
not using the EC2 scripts, you will have to rsync the directory manually
(copy-dir just calls rsync internally).

-Andrew


2014-08-06 2:39 GMT-07:00 Akhil Das ak...@sigmoidanalytics.com:

 Looks like a netty conflict there, most likely you are having mutiple
 versions of netty jars (eg:
 netty-3.6.6.Final.jar, netty-3.2.2.Final.jar, netty-all-4.0.13.Final.jar),
 you only require 3.6.6 i believe. a quick fix would be to remove the rest
 of them.

 Thanks
 Best Regards


 On Wed, Aug 6, 2014 at 3:05 PM, Andres Gomez Ferrer ago...@redborder.net
 wrote:

 Hi all,

 My name is Andres and I'm starting to use Apache Spark.

 I try to submit my spark.jar to my cluster using this:

 spark-submit --class net.redborder.spark.RedBorderApplication --master
 spark://pablo02:7077 redborder-spark-selfcontained.jar

 But when I did it .. My worker die .. and my driver too!

 This is my driver log:

 [INFO] 2014-08-06 06:30:12,025 [Driver-akka.actor.default-dispatcher-3]
  akka.event.slf4j.Slf4jLogger applyOrElse - Slf4jLogger started
 [INFO] 2014-08-06 06:30:12,061 [Driver-akka.actor.default-dispatcher-3]
  Remoting apply$mcV$sp - Starting remoting
 [ERROR] 2014-08-06 06:30:12,089 [Driver-akka.actor.default-dispatcher-6]
  akka.actor.ActorSystemImpl apply$mcV$sp - Uncaught fatal error from thread
 [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver]
 java.lang.VerifyError: (class:
 org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker
 signature:
 (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;)
 Wrong return type in function
  at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)
 at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
 Source)
 at java.lang.reflect.Constructor.newInstance(Unknown Source)
  at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
 at scala.util.Try$.apply(Try.scala:161)
  at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
  at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
 at scala.util.Success.flatMap(Try.scala:200)
  at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
  at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
 at
 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
  at
 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [INFO] 2014-08-06 06:30:12,093 [Driver-akka.actor.default-dispatcher-5]
  akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp -
 Shutting down remote daemon.
 [INFO] 2014-08-06 06:30:12,095 [Driver-akka.actor.default-dispatcher-5]
  akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp -
 Remote daemon shut down; proceeding with flushing remote transports.
 [INFO] 2014-08-06 06:30:12,102 [Driver-akka.actor.default-dispatcher-3]
  Remoting apply$mcV$sp - Remoting shut down
 [INFO] 2014-08-06 06:30:12,104 [Driver-akka.actor.default-dispatcher-3]
  akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp -
 Remoting shut down.
 

Re: Submitting to a cluster behind a VPN, configuring different IP address

2014-08-06 Thread nunarob
Hi, 

I'm having the exact same problem - I'm on a VPN and I'm trying to set the
proproperties spark.httpBroadcast.uri and spark.fileserver.uri so that they
bind to my VPN ip instead of my regular network IP. Were you ever able to
get this working? 

Cheers, 

-Rob




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-to-a-cluster-behind-a-VPN-configuring-different-IP-address-tp9360p11560.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming fails - where is the problem?

2014-08-06 Thread Andrew Or
Hi Simon,

The drivers and executors currently choose random ports to talk to each
other, so the Spark nodes will have to have full TCP access to each other.
This is changed in a very recent commit, where all of these random ports
will become configurable:
https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999.
This will be available in Spark 1.1, but for now you will have to open all
ports among the nodes in your cluster.

-Andrew


2014-08-06 10:23 GMT-07:00 durin m...@simon-schaefer.net:

 Update: I can get it to work by disabling iptables temporarily. I can,
 however, not figure out on which port I have to accept traffic. 4040 and
 any
 of the Master or Worker ports mentioned in the previous post don't work.

 Can it be one of the randomly assigned ones in the 30k to 60k range? Those
 appear to change every time, making it difficult to apply any sensible
 rules.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




GraphX Pagerank application

2014-08-06 Thread AlexanderRiggers
I want to use pagerank on a 3GB textfile, which contains a bipartite list
with variables id and brand. 

Example:
id,brand
86246,15343
86246,27873
86246,14647
86246,55172
86246,3293
86246,2820
86246,3830
86246,2820
86246,5603
86246,72482

To perform the page rank I have to create a graph object, adding the edges
by setting sourceID=id and distID=brand. In GraphLab there is function: g =
SGraph().add_edges(data, src_field='id', dst_field='brand')

Is there something similar in GraphX? In the GraphX docs there is an example
where a separate edgelist and usernames are joined, but I couldn't find a
use case for my problem.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pagerank-application-tp11562.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Runnning a Spark Shell locally against EC2

2014-08-06 Thread Andrew Or
Hi Gary,

This has indeed been a limitation of Spark, in that drivers and executors
use random ephemeral ports to talk to each other. If you are submitting a
Spark job from your local machine in client mode (meaning, the driver runs
on your machine), you will need to open up all TCP ports from your worker
machines, a requirement that is not super secure. However, a very recent
commit changes this (
https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999)
in that you can now manually configure all ports and only open up the ones
you configured. This will be available in Spark 1.1.

-Andrew


2014-08-06 8:29 GMT-07:00 Gary Malouf malouf.g...@gmail.com:

 We have Spark 1.0.1 on Mesos deployed as a cluster in EC2.  Our Devops
 lead tells me that Spark jobs can not be submitted from local machines due
 to the complexity of opening the right ports to the world etc.

 Are other people running the shell locally in a production environment?



Re: Runnning a Spark Shell locally against EC2

2014-08-06 Thread Gary Malouf
This will be awesome - it's been one of the major issues for our analytics
team as they hope to use their own python libraries.


On Wed, Aug 6, 2014 at 2:40 PM, Andrew Or and...@databricks.com wrote:

 Hi Gary,

 This has indeed been a limitation of Spark, in that drivers and executors
 use random ephemeral ports to talk to each other. If you are submitting a
 Spark job from your local machine in client mode (meaning, the driver runs
 on your machine), you will need to open up all TCP ports from your worker
 machines, a requirement that is not super secure. However, a very recent
 commit changes this (
 https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999)
 in that you can now manually configure all ports and only open up the ones
 you configured. This will be available in Spark 1.1.

 -Andrew


 2014-08-06 8:29 GMT-07:00 Gary Malouf malouf.g...@gmail.com:

 We have Spark 1.0.1 on Mesos deployed as a cluster in EC2.  Our Devops
 lead tells me that Spark jobs can not be submitted from local machines due
 to the complexity of opening the right ports to the world etc.

 Are other people running the shell locally in a production environment?





Re: issue with spark and bson input

2014-08-06 Thread Dmitriy Selivanov
Finally I made it work. The trick was in asSubclass method:
val mongoRDD = sc.newAPIHadoopFile(file:///root/jobs/dump/input.bson,
classOf[BSONFileInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object,
BSONObject]]), classOf[Object], classOf[BSONObject], config)


2014-08-06 0:43 GMT+04:00 Dmitriy Selivanov selivanov.dmit...@gmail.com:

 Hello, I have issue when try to use bson file as spark input. I use
 mongo-hadoop-connector 1.3.0 and spark 1.0.0:
  val sparkConf = new SparkConf()
 val sc = new SparkContext(sparkConf)
 val config = new Configuration()
 config.set(mongo.job.input.format,
 com.mongodb.hadoop.BSONFileInputFormat)
 config.set(mapred.input.dir, file:///root/jobs/dump/input.bson)
 config.set(mongo.output.uri, mongodb:// + args(0) + / + args(2))
 val mongoRDD =
 sc.newAPIHadoopFile(file:///root/jobs/dump/input.bson,
 classOf[BSONFileInputFormat], classOf[Object], classOf[BSONObject], config)

 But on last line I recieve error: inferred type arguments
 [Object,org.bson.BSONObject,com.mongodb.hadoop.BSONFileInputFormat] do not
 conform to method newAPIHadoopFile's type parameter bounds [K,V,F :
 org.apache.hadoop.mapreduce.InputFormat[K,V]]
 this is very strange, because BSONFileInputFormat
 extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat:
 https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/BSONFileInputFormat.java
 How I can solve this issue?
 I have no problems with com.mongodb.hadoop.MongoInputFormat when use
 mongodb collection as input.
 And moreover seems there is no problem with java api:
 https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java
 I'm not professional java/scala developer, please help.

 --
 Regards
 Dmitriy Selivanov




-- 
Regards
Dmitriy Selivanov


Re: Spark Streaming fails - where is the problem?

2014-08-06 Thread durin
Hi Andrew, 
for this test I only have one machine which provides the master and only 
worker. 
So all I'd need is communication to the Internet to access the twitter API. 
I've tried assigning a specific port to the driver and creating iptables rules 
for this port, but that didn't work. 
Best regards, 
Simon 
On Aug 6, 2014 11:37 AM, quot;Andrew Or-2 [via Apache Spark User List]quot; 
lt;ml-node+s1001560n11561...@n3.nabble.comgt; wrote: 

Hi Simon, The drivers and executors currently choose random ports to 
talk to each other, so the Spark nodes will have to have full TCP access to 
each other. This is changed in a very recent commit, where all of these random 
ports will become configurable:  
https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999 
. This will be available in Spark 1.1, but for now you will have to open all 
ports among the nodes in your cluster. 
-Andrew 2014-08-06 10:23 GMT-07:00 durin lt; [hidden email] gt;: 
lt;blockquote style='border-left:2px solid #CC;padding:0 1em' 
class=quot;gmail_quotequot; style=quot;margin:0 0 0 .8ex;border-left:1px 
#ccc solid;padding-left:1exquot;gt;Update: I can get it to work by disabling 
iptables temporarily. I can, 
however, not figure out on which port I have to accept traffic. 4040 and any 
of the Master or Worker ports mentioned in the previous post don#39;t work. 

Can it be one of the randomly assigned ones in the 30k to 60k range? Those 
appear to change every time, making it difficult to apply any sensible 
rules. 



-- 
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html
 

Sent from the Apache Spark User List mailing list archive at Nabble.com. 

- 
To unsubscribe, e-mail: [hidden email] 
For additional commands, e-mail: [hidden email] 











If you reply to this email, your message will be added to the 
discussion below: 

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11561.html
 



To unsubscribe from Spark Streaming fails - where is the 
problem?, click here . 
NAML 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11566.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

heterogeneous cluster hardware

2014-08-06 Thread anthonyjschu...@gmail.com
I'm sure this must be a fairly common use-case for spark, yet I have not
found a satisfactory discussion of it on the spark website or forum:

I work at a company with a lot of previous-generation server hardware
sitting idle-- I want to add this hardware to my spark cluster to increase
performance! BUT: It is unclear as to whether the spark master will be able
to properly apportion jobs to the slaves if they have differing hardware
specs.

As I understand, the default spark launch scripts are incompatible with
per-node hardware configurations, but it seems I could compose custom
spark-conf.sh files for each slave to fully utilize its hardware.

Would the master take these per-node configurations into consideration when
allocating work? or would the cluster necessarily fall to the
lowest-common-hardware-denominator?

Is this an area which needs development? I might be willing to look into
attempting to introduce this functionality if it is lacking.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark memory management

2014-08-06 Thread Gary Malouf
I have a few questions about managing Spark memory:

1) In a standalone setup, is their any cpu prioritization across users
running jobs?  If so, what is the behavior here?

2) With Spark 1.1, users will more easily be able to run drivers/shells
from remote locations that do not cause firewall headaches.  Is there a way
to kill an individual user's job from the console without killing workers?
 We are in Mesos and are not aware of an easy way to handle this, but I
imagine standalone mode may handle this.


Spark build error

2014-08-06 Thread Priya Ch
Hi,

I am trying to build jars using the command :

mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package

Execution of the above command is throwing the following error:

[INFO] Spark Project Core . FAILURE [  0.295 s]
[INFO] Spark Project Bagel  SKIPPED
[INFO] Spark Project GraphX ... SKIPPED
[INFO] Spark Project ML Library ... SKIPPED
[INFO] Spark Project Streaming  SKIPPED
[INFO] Spark Project Tools  SKIPPED
[INFO] Spark Project Catalyst . SKIPPED
[INFO] Spark Project SQL .. SKIPPED
[INFO] Spark Project Hive . SKIPPED
[INFO] Spark Project REPL . SKIPPED
[INFO] Spark Project YARN Parent POM .. SKIPPED
[INFO] Spark Project YARN Stable API .. SKIPPED
[INFO] Spark Project Assembly . SKIPPED
[INFO] Spark Project External Twitter . SKIPPED
[INFO] Spark Project External Kafka ... SKIPPED
[INFO] Spark Project External Flume ... SKIPPED
[INFO] Spark Project External ZeroMQ .. SKIPPED
[INFO] Spark Project External MQTT  SKIPPED
[INFO] Spark Project Examples . SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 3.748 s
[INFO] Finished at: 2014-08-07T01:00:48+05:30
[INFO] Final Memory: 24M/175M
[INFO] 
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
(default) on project spark-core_2.10: Execution default of goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
failed: For artifact {null:null:null:jar}: The groupId cannot be
empty. - [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
execute goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
(default) on project spark-core_2.10: Execution default of goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
failed: For artifact {null:null:null:jar}: The groupId cannot be
empty.
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
at 
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:347)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:154)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:213)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
default of goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
failed: For artifact {null:null:null:jar}: The groupId cannot be
empty.
at 
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:143)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
... 19 more
Caused by: org.apache.maven.artifact.InvalidArtifactRTException: For
artifact {null:null:null:jar}: The groupId cannot be empty.



Can someone help me on this ?


Re: Unit Test for Spark Streaming

2014-08-06 Thread JiajiaJing
Thank you TD,

I have worked around that problem and now the test compiles. 
However, I don't actually see that test running. As when I do mvn test, it
just says BUILD SUCCESS, without any TEST section on stdout. 
Are we suppose to use mvn test to run the test? Are there any other
methods can be used to run this test?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-for-Spark-Streaming-tp11394p11570.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Regarding tooling/performance vs RedShift

2014-08-06 Thread Gary Malouf
Forgot to cc the mailing list :)


On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG) 
r.dan...@elsevier.com wrote:

  Agreed. Being able to use SQL to make a table, pass it to a graph
 algorithm, pass that output to a machine learning algorithm, being able to
 invoke user defined python functions, … are capabilities that far exceed
 what we can do with Redshift. The total performance will be much better,
 and the programmer productivity will be much better, even if the SQL
 portion is not quite as fast.  Mostly I was just objecting to  Redshift
 does very well, but Shark is on par or better than it in most of the tests
  when that was not how I read the results, and Redshift was on HDDs.



 BTW – What are you doing w/ Spark? We have a lot of text and other content
 that we want to mine, and are shifting onto Spark so we have the greater
 capabilities mentioned above.





 Best regards,



 Ron Daniel, Jr.

 Director, Elsevier Labs

 r.dan...@elsevier.com

 mobile: +1 619 208 3064







 *From:* Gary Malouf [mailto:malouf.g...@gmail.com]
 *Sent:* Wednesday, August 06, 2014 12:35 PM
 *To:* Daniel, Ronald (ELS-SDG)

 *Subject:* Re: Regarding tooling/performance vs RedShift



 Hi Ronald,



 In my opinion, the performance just has to be 'close' to make that piece
 irrelevant.  I think the real issue comes down to tooling and the ease of
 connecting their various python tools from the office to results coming out
 of Spark/other solution in 'the cloud'.





 On Wed, Aug 6, 2014 at 1:43 PM, Daniel, Ronald (ELS-SDG) 
 r.dan...@elsevier.com wrote:

 Just to point out that the benchmark you point to has Redshift running on
 HDD machines instead of SSD, and it is still faster than Shark in all but
 one case.



 Like Gary, I'm also interested in replacing something we have on Redshift
 with Spark SQL, as it will give me much greater capability to process
 things. I'm willing to sacrifice some performance for the greater
 capability. But it would be nice to see the benchmark updated with Spark
 SQL, and with a more competitive configuration of Redshift.



 Best regards, and keep up the great work!



 Ron





 *From:* Nicholas Chammas [mailto:nicholas.cham...@gmail.com]
 *Sent:* Wednesday, August 06, 2014 9:30 AM
 *To:* Gary Malouf
 *Cc:* user


 *Subject:* Re: Regarding tooling/performance vs RedShift



 1) We get tooling out of the box from RedShift (specifically, stable JDBC
 access) - Spark we often are waiting for devops to get the right combo of
 tools working or for libraries to support sequence files.



 The arguments about JDBC access and simpler setup definitely make sense.
 My first non-trivial Spark application was actually an ETL process that
 sliced and diced JSON + tabular data and then loaded it into Redshift. From
 there on you got all the benefits of your average C-store database, plus
 the added benefit of Amazon managing many annoying setup and admin details
 for your Redshift cluster.



 One area I'm looking forward to seeing Spark SQL excel at is offering fast
 JDBC access to raw data--i.e. directly against S3 / HDFS; no ETL
 required. For easy and flexible data exploration, I don't think you can
 beat that with a C-store that you have to ETL stuff into.



 2) There is a belief that for many of our queries (assumed to often be
 joins) a columnar database will perform orders of magnitude better.



 This is definitely a it depends statement, but there is a detailed
 benchmark here https://amplab.cs.berkeley.edu/benchmark/ comparing
 Shark, Redshift, and other systems. Have you seen it? Redshift does very
 well, but Shark is on par or better than it in most of the tests. Of
 course, going forward we'll want to see Spark SQL match this kind of
 performance, and that remains to be seen.



 Nick





 On Wed, Aug 6, 2014 at 12:06 PM, Gary Malouf malouf.g...@gmail.com
 wrote:

 My company is leaning towards moving much of their analytics work from our
 own Spark/Mesos/HDFS/Cassandra set up to RedShift.  To date, I have been
 the internal advocate for using Spark for analytics, but a number of good
 points have been brought up to me.  The reasons being pushed are:



 - RedShift exposes a jdbc interface out of the box (no devops work there)
 and data looks and feels like it is in a normal sql database.  They want
 this out of the box from Spark, no trying to figure out which version
 matches this version of Hive/Shark/SparkSQL etc.  Yes, the next release
 theoretically supports this but there have been release issues our team has
 battled to date that erode the trust.



 - Complaints around challenges we have faced running a spark shell locally
 against a cluster in EC2.  It is partly a devops issue of deploying the
 correct configurations to local machines, being able to kick a user off
 hogging RAM, etc.



 - I want to be able to run queries from my python shell against your
 sequence file data, roll it up and in the same shell leverage python graph
 tools.  - I'm not very 

Re: Regarding tooling/performance vs RedShift

2014-08-06 Thread Nicholas Chammas
On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG)
r.dan...@elsevier.com wrote:

 Mostly I was just objecting to  Redshift does very well, but Shark is on
 par or better than it in most of the tests  when that was not how I read
 the results, and Redshift was on HDDs.


My bad. You are correct; the only test Shark (mem) does better on is test
#1 Scan Query.

And indeed, it would be good to see an updated benchmark with Redshift
running on SSDs.

Nick


Re: Regarding tooling/performance vs RedShift

2014-08-06 Thread Gary Malouf
Also, regarding something like redshift not having MLlib built in, much of
that could be done on the derived results.
On Aug 6, 2014 4:07 PM, Nicholas Chammas nicholas.cham...@gmail.com
wrote:

 On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG)
 r.dan...@elsevier.com wrote:

 Mostly I was just objecting to  Redshift does very well, but Shark is
 on par or better than it in most of the tests  when that was not how I
 read the results, and Redshift was on HDDs.


 My bad. You are correct; the only test Shark (mem) does better on is test
 #1 Scan Query.

 And indeed, it would be good to see an updated benchmark with Redshift
 running on SSDs.

 Nick



UpdateStateByKey - How to improve performance?

2014-08-06 Thread Venkat Subramanian
The method

def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
Option[S] ): DStream[(K, S)]

takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

We have a input Dstream(K,V) that has 40,000 elements. We update on average
of 1000  elements of them in every 3 second batch, but based on how this
updateStateByKey function is defined, we are looping through 40,000 elements
(Seq[V]) to make an update for just 1000 elements and not updating 39000
elements. I think looping through extra 39000 elements is a waste of
performance.

Isn't there a better way to update this efficiently by just figuring out the
a hash map for the 1000 elements that are required to be updated and just
updating it (without looping through the unwanted elements)?  Shouldn't
there be a Streaming update function provided that updates selective members
or are we missing some concepts here?

I think updateStateByKey may be causing lot of performance degradation in
our app as we keep doing this again and again for every batch. Please let us
know if my thought process is correct here.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Regarding tooling/performance vs RedShift

2014-08-06 Thread Daniel, Ronald (ELS-SDG)
Well yes, MLlib-like routines or pretty much anything else could be run on the 
derived results, but you have to unload the results from Redshift and then load 
them into some other tool. So it's nicer to leave them in memory and operate on 
them there. Major architectural advantage to Spark.

Ron


From: Gary Malouf [mailto:malouf.g...@gmail.com]
Sent: Wednesday, August 06, 2014 1:17 PM
To: Nicholas Chammas
Cc: Daniel, Ronald (ELS-SDG); user@spark.apache.org
Subject: Re: Regarding tooling/performance vs RedShift


Also, regarding something like redshift not having MLlib built in, much of that 
could be done on the derived results.
On Aug 6, 2014 4:07 PM, Nicholas Chammas 
nicholas.cham...@gmail.commailto:nicholas.cham...@gmail.com wrote:
On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald 
(ELS-SDG)r.dan...@elsevier.commailto:r.dan...@elsevier.com wrote:
Mostly I was just objecting to  Redshift does very well, but Shark is on par 
or better than it in most of the tests  when that was not how I read the 
results, and Redshift was on HDDs.

My bad. You are correct; the only test Shark (mem) does better on is test #1 
Scan Query.

And indeed, it would be good to see an updated benchmark with Redshift running 
on SSDs.

Nick


Re: Writing to RabbitMQ

2014-08-06 Thread Tathagata Das
Yeah, I have observed this common problem in the design a number of times
in the mailing list. Tobias, what you linked to is also an additional
problem, that occurs with mapPartitions, but not with foreachPartitions
(which is relevant here). But I do get your point. I think there was an
attempt made in that respect (setup function) using RDD.mapWith and
RDD.flatMapWith, but this is got deprecated in 1.0


On Tue, Aug 5, 2014 at 9:54 AM, jschindler john.schind...@utexas.edu
wrote:

 You are correct in that I am trying to publish inside of a foreachRDD loop.
 I am currently refactoring and will try publishing inside the
 foreachPartition loop.  Below is the code showing the way it is currently
 written, thanks!


 object myData {
   def main(args: Array[String]) {

 val ssc = new StreamingContext(local[8], Data, Seconds(10))
 ssc.checkpoint(checkpoint)
 val topicMap = Map(pagehit.data - 1)

 val factory = new ConnectionFactory()
 factory.setUsername(officialUsername)
 factory.setPassword(crypticPassword)
 factory.setVirtualHost(/)
 factory.setHost(rabbit-env)
 factory.setPort()
 val connection = factory.newConnection()

 val SQLChannel = connection.createChannel()
 SQLChannel.queueDeclare(SQLQueue, true, false, false, null)

 val Pipe = KafkaUtils.createStream(ssc,
 Zookeeper_1,Zookeeper_1,Zookeeper_3, Cons1,
 topicMap).map(_._2)

 //PARSE SOME JSON ETC

   windowStream.foreachRDD(pagehit = {
   val mongoClient = MongoClient(my-mongodb)
   val db = mongoClient(myClient)
   val SQLCollection = db(SQLCalls)

   val callArray = pagehit.map(_._1).collect
   val avg = (callArray.reduceLeft[Long](_+_))/callArray.length
   val URL = pagehit.take(1).map(_._2)

   SQLCollection += MongoDBObject(URL - URL(0).substring(7,
 URL(0).length - 1),
  Avg Page
 Load Time - avg)

   val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace)
   val byteArray = toBuildJSON.mkString.getBytes()

   SQLChannel.basicPublish(, SQLQueue, null, byteArray)

 })



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Regarding tooling/performance vs RedShift

2014-08-06 Thread Nicholas Chammas
On Wed, Aug 6, 2014 at 4:30 PM, Daniel, Ronald (ELS-SDG) 
r.dan...@elsevier.com wrote:

 Major architectural advantage to Spark.


Amen to that. For a really cool and succinct demonstration of this, check
out Aaron's demo http://youtu.be/sPhyePwo7FA?t=10m16s at the Hadoop
Summit earlier this ear where he combines SQL, machine learning, and stream
processing using Spark. I don't think you can do this with any other
platform.

Nick


Re: SparkR : lapplyPartition transforms the data in vertical format

2014-08-06 Thread Shivaram Venkataraman
The output of lapply and lapplyPartition should the same by design -- The
only difference is that in lapply the user-defined function returns a row,
while it returns a list in lapplyPartition.

Could you given an example of a small input and output that you expect to
see for the above program ?

Shivaram


On Wed, Aug 6, 2014 at 5:47 AM, Pranay Dave pranay.da...@gmail.com wrote:

 Hello
 As per documentation, lapply works on single records and lapplyPartition
 works on partition
 However the format of output does not change

 When I use lapplypartition, the data is converted to vertical format

 Here is my code
 library(SparkR)


 sc - sparkR.init(local)
 lines - textFile(sc,/sparkdev/datafiles/covariance.txt)

 totals - lapply(lines, function(lines)
 {


 sumx - 0
 sumy - 0
 totaln - 0
 for (i in 1:length(lines)){
 dataxy - unlist(strsplit(lines[i], ,))
 sumx - sumx  + as.numeric(dataxy[1])
 sumy - sumy  + as.numeric(dataxy[2])

 }

 ##list(as.numeric(sumx), as.numeric(sumy), as.numeric(sumxy),
 as.numeric(totaln))
 ##list does same as below
 c(sumx,sumy)

 }

 )

 output - collect(totals)
 for (element in output) {
   cat(as.character(element[1]),as.character(element[2]), \n)
 }

 I am expecting output as 55, 55
 However it is giving
 55,NA
 55,NA

 Where am I going wrong ?
 Thanks
 Pranay



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-lapplyPartition-transforms-the-data-in-vertical-format-tp11540.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark streaming at-least once guarantee

2014-08-06 Thread Tathagata Das
Why cant you keep a persistent queue of S3 files to process? The process
that you are running has two threads
Thread 1: Continuously gets SQS messages and write to the queue. This queue
is persisted to the reliable storage, like HDFS / S3.
Thread 2: Peek into the queue, and whenever there is any message, start
spark job to process the files. Once the spark job is over, it will dequeue
that file from that queue.
If this process fails in the middle of processing some files, then it can
be restarted and thread 2 will start processing the files in the queue
again (since they were not dequeued, as the jobs had not finished
successfully).

The file input stream in Spark Streaming essentially does this same thing.
So you can either implement this directly. Or, you can subclass
FileInputDStream, and override the functions that find new files to
process. There you can start a different thread, that listens and queues
SQS messages. Then when compute() method is called after every batch
interval, the SQS message queued should process, and RDDs from the files
needs to be generated.

This new Input dstream may be worth a try, but I feel that there will be
corner cases which will be harder to deal with in this architecture, but
can be dealt with the custom architecture i stated first. Corner cases
like: what if a SQS message is downloaded but the process failes before
creating RDDs out of them (there could be a delay of batch interval between
those two in Spark Streaming)? When restarted, can you refetch the messages
once again?

Hope this helps!

TD




On Wed, Aug 6, 2014 at 12:13 AM, lalit1303 la...@sigmoidanalytics.com
wrote:

 Hi TD,

 Thanks a lot for your reply :)
 I am already looking into creating a new DStream for SQS messages. It would
 be very helpful if you can provide with some guidance regarding the same.

 The main motive of integrating SQS with spark streaming is to make my Jobs
 run in high availability.
 As of now I am having a downloader, which downloads file pointed by SQS
 messages and the my spark streaming job comes in action to process them. I
 am planning to move whole architecture into high availability (spark
 streaming job can easily be shifted to high availability), only piece left
 is integrate SQS with spark streaming such that it can automatically
 recover
 master node failure. Also, I want to make a single pipeline, start from
 getting SQS message to the processing of corresponding file.

 I couldn't think of any other approach to make my SQS downloader run in
 high
 availability mode. The only thing I have to get, is create a Dstream which
 reads sqs messages from the corresponding queue.

 Please let me know if there is any other work around.

 Thanks
 -- Lalit



 -
 Lalit Yadav
 la...@sigmoidanalytics.com
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-at-least-once-guarantee-tp10902p11525.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: UpdateStateByKey - How to improve performance?

2014-08-06 Thread Michael Malak
Depending on the density of your keys, the alternative signature

def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ? 
Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: 
Boolean)(implicit arg0: ClassTag[S]): DStream[(K, S)] 

at least iterates by key rather than by (old) value.

I believe your thinking is correct that there might be a performance 
improvement opportunity for your case if there were an updateStateByKey() that 
instead iterated by (new) value.

BTW, my impression from the stock examples is that the signature I pasted above 
was intended to be the more typically called updateStateByKey(), as opposed to 
the one you pasted, for which my impression is that it is the more general 
purpose one. I have used the more general purpose one but only when I needed to 
peek into the entire set of states for some unusual reason.



On Wednesday, August 6, 2014 2:30 PM, Venkat Subramanian vsubr...@gmail.com 
wrote:
 


The method

def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
Option[S] ): DStream[(K, S)]

takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

We have a input Dstream(K,V) that has 40,000 elements. We update on average
of 1000  elements of them in every 3 second batch, but based on how this
updateStateByKey function is defined, we are looping through 40,000 elements
(Seq[V]) to make an update for just 1000 elements and not updating 39000
elements. I think looping through extra 39000 elements is a waste of
performance.

Isn't there a better way to update this efficiently by just figuring out the
a hash map for the 1000 elements that are required to be updated and just
updating it (without looping through the unwanted elements)?  Shouldn't
there be a Streaming update function provided that updates selective members
or are we missing some concepts here?

I think updateStateByKey may be causing lot of performance degradation in
our app as we keep doing this again and again for every batch. Please let us
know if my thought process is correct here.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: UpdateStateByKey - How to improve performance?

2014-08-06 Thread Tathagata Das
Hello Venkat,

Your thoughts are quite spot on. The current implementation was designed to
allow the functionality of timing out a state. For this to be possible, the
update function need to be called each key even if there is no new data, so
that the function can check things like last update time, etc to time
itself out and return a None as state. However, in Spark 1.2 I plan to
improve on the performance for such scenarios as yours.

For the time being, you could also try other techniques
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch
for improving performance (if you havent tried already). You can also set
the storage level of dstream as non-serialized, which may improve perf.

TD


On Wed, Aug 6, 2014 at 1:29 PM, Venkat Subramanian vsubr...@gmail.com
wrote:

 The method

 def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
 Option[S] ): DStream[(K, S)]

 takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

 We have a input Dstream(K,V) that has 40,000 elements. We update on average
 of 1000  elements of them in every 3 second batch, but based on how this
 updateStateByKey function is defined, we are looping through 40,000
 elements
 (Seq[V]) to make an update for just 1000 elements and not updating 39000
 elements. I think looping through extra 39000 elements is a waste of
 performance.

 Isn't there a better way to update this efficiently by just figuring out
 the
 a hash map for the 1000 elements that are required to be updated and just
 updating it (without looping through the unwanted elements)?  Shouldn't
 there be a Streaming update function provided that updates selective
 members
 or are we missing some concepts here?

 I think updateStateByKey may be causing lot of performance degradation in
 our app as we keep doing this again and again for every batch. Please let
 us
 know if my thought process is correct here.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark memory management

2014-08-06 Thread Andrew Or
Hey Gary,

The answer to both of your questions is that much of it is up to the
application.

For (1), the standalone master can set spark.deploy.defaultCores to limit
the number of cores each application can grab. However, the application can
override this with the applications-specific spark.cores.max, meaning
there is currently nothing the master can do if the application is greedy
and demands all the cores in the world.

For (2), I am not aware of an existing way the standalone master can kill a
user application. The most you can do is to go to the application SparkUI
and kill the stages (there is a button), though this is not specific to
standalone mode.

There is currently a lot of trust between the standalone master and the
application. Maybe this is not always a good thing. :)

-Andrew


2014-08-06 12:23 GMT-07:00 Gary Malouf malouf.g...@gmail.com:

 I have a few questions about managing Spark memory:

 1) In a standalone setup, is their any cpu prioritization across users
 running jobs?  If so, what is the behavior here?

 2) With Spark 1.1, users will more easily be able to run drivers/shells
 from remote locations that do not cause firewall headaches.  Is there a way
 to kill an individual user's job from the console without killing workers?
  We are in Mesos and are not aware of an easy way to handle this, but I
 imagine standalone mode may handle this.




Re: Unit Test for Spark Streaming

2014-08-06 Thread Tathagata Das
Does it not show the name of the testsuite on stdout, showing that it has
passed? Can you try writing a small test unit-test, in the same way as
your kafka unit test, and with print statements on stdout ... to see
whether it works? I believe it is some configuration issue in maven, which
is hard for me to guess.

TD


On Wed, Aug 6, 2014 at 12:53 PM, JiajiaJing jj.jing0...@gmail.com wrote:

 Thank you TD,

 I have worked around that problem and now the test compiles.
 However, I don't actually see that test running. As when I do mvn test,
 it
 just says BUILD SUCCESS, without any TEST section on stdout.
 Are we suppose to use mvn test to run the test? Are there any other
 methods can be used to run this test?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-for-Spark-Streaming-tp11394p11570.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Stopping StreamingContext does not kill receiver

2014-08-06 Thread lbustelo
I'm running on spark 1.0.0 and I see a similar problem when using the
socketTextStream receiver. The ReceiverTracker task sticks around after a
ssc.stop(false).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522p11587.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-06 Thread Tathagata Das
You can use SparkSQL for that very easily. You can convert the rdds you get
from kafka input stream, convert them to a RDDs of case classes and save as
parquet files.
More information here.
https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files


On Wed, Aug 6, 2014 at 5:23 AM, Mahebub Sayyed mahebub...@gmail.com wrote:

 Hello,

 I have referred link https://github.com/dibbhatt/kafka-spark-consumer;
 and I have successfully consumed tuples from kafka.
 Tuples are JSON objects and I want to store that objects in HDFS as parque
 format.

 Please suggest me any sample example for that.
 Thanks in advance.





 On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 You can try this Kafka Spark Consumer which I recently wrote. This uses
 the Low Level Kafka Consumer

 https://github.com/dibbhatt/kafka-spark-consumer

 Dibyendu




 On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote:

 Hi,

 I am new to Apache Spark and Trying to Develop spark streaming program
 to  *stream data from kafka topics and output as parquet file on HDFS*.

 Please share the *sample reference* program to stream data from kafka
 topics and output as parquet file on HDFS.

 Thanks in Advance.

 Regards,

 Rafeeq S
 *(“What you do is what matters, not what you think or say or plan.” )*





 --
 *Regards,*
 *Mahebub Sayyed*



Re: Spark Streaming multiple streams problem

2014-08-06 Thread Tathagata Das
You probably have only 10 cores in your cluster on which you are executing
your job. since each dstream / receiver take one core each, the system is
not able to start all of them and so everything is blocked.


On Wed, Aug 6, 2014 at 3:08 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
wrote:

 Hi,

 I am reading multiple streams from multiple ports with a single streaming
 context. I have created array of Dstream. This works until 10 streams. But
 if I go over that ( i have checked with 15 and 20 streams), spark streaming
 task stucks and is taking time. I waited for 10 minutes(2.2 min in the
 attached screenshot), still not going through. The attached streaming UI
 shows where it stucks.

 If this is not the right way to read multiple streams, what else is the
 alternative?? I dont want to union the streams. I want to read them
 simultaneously in parallel.

 object StreamAnomalyDetector {

 def calculate(sumOfSquare: Double, sumOfN: Double, n: Int):(Int,( Double,
 Double, Double)) ={

 val mean = sumOfN/n
 val varience = sumOfSquare/n - math.pow(mean,2)
 return (n, (mean, varience, math.sqrt(varience)))
 }

 def main(args: Array[String]) {
 if (args.length  3) {
   System.err.println(Usage: StreamAnomalyDetector master hostname
 port)
   System.exit(1)
 }
 //Setting systen properties
 //System.setProperty(spark.cores.max, 3)
 System.setProperty(spark.executor.memory, 5g)

 // Create the context
 val ssc = new StreamingContext(args(0), StreamAnomalyDetector,
 Milliseconds(1000),
 System.getenv(SPARK_HOME),
 List(target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar))

 //hdfs path to checkpoint old data

 ssc.checkpoint(hdfs://host-10-20-20-17.novalocal:9000/user/hduser/checkpointing/)

 val eegStreams = new
 Array[org.apache.spark.streaming.dstream.DStream[String]](args.length - 2)
 //array for multiple streams

 // Create the NetworkInputDStream
 for (a - 0 to (args.length - 3))
 {
 eegStreams(a) = ssc.socketTextStream(args(1), args(a+2).toInt,
 StorageLevel.MEMORY_AND_DISK_SER) //Multiple DStreams into Array
 val sums = eegStreams(a).map(x = (math.pow(x.toDouble, 2),
 x.toDouble, 1)).reduceByWindow((a, b) = (a._1 + b._1, a._2 + b._2, a._3 +
 b._3),(a, b) = (a._1 - b._1, a._2 - b._2, a._3 - b._3),  Seconds(4),
 Seconds(4))
 val meanAndSD = sums.map(x = calculate(x._1,x._2,x._3))

 meanAndSD.saveAsTextFiles(hdfs://host-10-20-20-17.novalocal:9000/user/hduser/output/
 + (a + 1) )
 }


 ssc.start()
 ssc.awaitTermination()
 }
 }


 Regards,
 Laeeq


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Stopping StreamingContext does not kill receiver

2014-08-06 Thread Tathagata Das
Can you give the stack trace?

This was the fix for the twitter stream.

https://github.com/apache/spark/pull/1577/files

You could try doing the same.

TD



On Wed, Aug 6, 2014 at 2:41 PM, lbustelo g...@bustelos.com wrote:

 I'm running on spark 1.0.0 and I see a similar problem when using the
 socketTextStream receiver. The ReceiverTracker task sticks around after a
 ssc.stop(false).



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522p11587.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread salemi
Hi,
I have a  DStream called eventData and it contains set of  Data objects
defined as followed:

case class Data(startDate: Long, endDate: Long, className: String, id:
String, state: String)

How would the reducer and inverse reducer functions look like if I would
like to add the data for current 3 second and filter out the last 3 second
data?

eventData.reduceByWindow(/reduceFunc/, /invReduceFunc/, Minutes(15),
Seconds(3))

Thanks
Ali



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Naive Bayes parameters

2014-08-06 Thread SK

1) How is the minPartitions parameter in NaiveBayes example used? What is
the default value?

2) Why is the  numFeatures specified as a parameter? Can this not be
obtained from the data? This parameter is not specified for the other MLlib
algorithms.  

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-parameters-tp11592.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread Tathagata Das
Why isnt a simple window function sufficient?

eventData.window(Minutes(15), Seconds(3)) will keep generating RDDs every 3
second, each containing last 15 minutes of data.

TD


On Wed, Aug 6, 2014 at 3:43 PM, salemi alireza.sal...@udo.edu wrote:

 Hi,
 I have a  DStream called eventData and it contains set of  Data objects
 defined as followed:

 case class Data(startDate: Long, endDate: Long, className: String, id:
 String, state: String)

 How would the reducer and inverse reducer functions look like if I would
 like to add the data for current 3 second and filter out the last 3 second
 data?

 eventData.reduceByWindow(/reduceFunc/, /invReduceFunc/, Minutes(15),
 Seconds(3))

 Thanks
 Ali



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Trying to make sense of the actual executed code

2014-08-06 Thread Tom
Hi,

I am trying to look at for instance the following SQL query in Spark 1.1:
SELECT table.key, table.value, table2.value FROM table2 JOIN table WHERE
table2.key = table.key
When I look at the output, I see that there are several stages, and several
tasks per stage. The tasks have a TID, I do not see such a thing for a
stage. I see the input split of the files and start, running and finished
messages for the tasks. But what I really want to know is the following:
Which map, shuffle and reduces are performed in which order/where can I see
the actual executed code per task/stage. In between files/rdd's would be a
bonus!

Thanks in advance,

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-make-sense-of-the-actual-executed-code-tp11594.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using Python IDE for Spark Application Development

2014-08-06 Thread Sathish Kumaran Vairavelu
Hello,

I am trying to use the python IDE PyCharm for Spark application
development. How can I use pyspark with Python IDE? Can anyone help me with
this?


Thanks

Sathish


Re: Using Python IDE for Spark Application Development

2014-08-06 Thread Mohit Singh
My naive set up..
Adding
os.environ['SPARK_HOME'] = /path/to/spark
sys.path.append(/path/to/spark/python)
on top of my script.
from pyspark import SparkContext
from pyspark import SparkConf
Execution works from within pycharm...

Though my next step is to figure out autocompletion and I bet there are
better ways to develop apps for spark..



On Wed, Aug 6, 2014 at 4:16 PM, Sathish Kumaran Vairavelu 
vsathishkuma...@gmail.com wrote:

 Hello,

 I am trying to use the python IDE PyCharm for Spark application
 development. How can I use pyspark with Python IDE? Can anyone help me with
 this?


 Thanks

 Sathish





-- 
Mohit

When you want success as badly as you want the air, then you will get it.
There is no other secret of success.
-Socrates


Hive 11 / CDH 4.6/ Spark 0.9.1 dilemmna

2014-08-06 Thread Anurag Tangri
I posted this in cdh-user mailing list yesterday and think this should have
been the right audience for this:

=

Hi All,
Not sure if anyone else faced this same issue or not.

We installed CDH 4.6 that uses Hive 0.10.

And we have Spark 0.9.1 that comes with Hive 11.

Now our hive jobs that work on CDH, fail in Shark.

Anyone else facing same issues and any work-arounds ?

Can we re-compile shark 0.9.1 with hive 10 or compile hive 11 on CDH 4.6 ?



Thanks,
Anurag Tangri


Re: Hive 11 / CDH 4.6/ Spark 0.9.1 dilemmna

2014-08-06 Thread Sean Owen
I haven't tried any of this, mind you, but my guess is that your options
are, from least painful and most likely to work onwards, are:

- Get Spark / Shark to compile against Hive 0.10
- Shade Hive 0.11 into Spark
- Update to CDH5.0+

I don't think there will be more updated releases of Shark or
Spark-on-CDH4, so you may want to be moving forward anyway.


On Thu, Aug 7, 2014 at 12:46 AM, Anurag Tangri atan...@groupon.com wrote:

 I posted this in cdh-user mailing list yesterday and think this should
 have been the right audience for this:

 =

 Hi All,
 Not sure if anyone else faced this same issue or not.

 We installed CDH 4.6 that uses Hive 0.10.

 And we have Spark 0.9.1 that comes with Hive 11.

 Now our hive jobs that work on CDH, fail in Shark.

 Anyone else facing same issues and any work-arounds ?

 Can we re-compile shark 0.9.1 with hive 10 or compile hive 11 on CDH 4.6 ?



 Thanks,
 Anurag Tangri




Re: RDD to DStream

2014-08-06 Thread Tathagata Das
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself
it is not trivial to cleanly stream a RDD as a DStream. Since RDD
operations are defined to be scan based, it is not efficient to define RDD
based on slices of data within a partition of another RDD, using pure RDD
transformations. What you have done is a decent, and probably the only
feasible solution, with its limitations.

Also the requirements of converting a batch of data to a stream of data can
be pretty diverse. What rate, what # of events per batch, how many batches,
is it efficient? Hence, it is not trivial to define a good, clean public
API for that. If any one has any thoughts, ideas, etc on this, you are more
than welcome to share them.

TD


On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 The use case for converting RDD into DStream is that I want to simulate a
 stream from an already persisted data for testing analytics. It is trivial
 to create a RDD from any persisted data but not so much for DStream.
 Therefore, my idea to create DStream from RDD. For example, lets say you
 are trying to implement analytics on time series data using Lambda
 architecture. This means you would have to implement the same analytics on
 streaming data (in streaming mode) as well as persisted data (in batch
 mode). The workflow for implementing the anlytics would be to first
 implement it in batch mode using RDD operations and then simulate stream to
 test the analytics in stream mode. The simulated stream should produce the
 elements at a specified rate. So the solution maybe to read data in a RDD,
 split (chunk) it into multiple RDDs with each RDD having the size of
 elements that need to be streamed per time unit and then finally stream
 each RDD using the compute function.

 The problem with using QueueInputDStream is that it will stream data as
 per the batch duration specified in the streaming context and one cannot
 specify a custom slide duration. Moreover, the class QueueInputDStream is
 private to streaming package, so I can't really use it/extend it from an
 external package. Also, I could not find a good solution split a RDD into
 equal sized smaller RDDs that can be fed into an extended version of
 QueueInputDStream.

 Finally, here is what I came up with:

 class RDDExtension[T: ClassTag](rdd: RDD[T]) {
   def toStream(streamingContext: StreamingContext, chunkSize: Int,
 slideDurationMilli: Option[Long] = None): DStream[T] = {
 new InputDStream[T](streamingContext) {

   private val iterator = rdd.toLocalIterator // WARNING: each
 partition much fit in RAM of local machine.
   private val grouped = iterator.grouped(chunkSize)

   override def start(): Unit = {}

   override def stop(): Unit = {}

   override def compute(validTime: Time): Option[RDD[T]] = {
 if (grouped.hasNext) {
   Some(rdd.sparkContext.parallelize(grouped.next()))
 } else {
   None
 }
   }

   override def slideDuration = {
 slideDurationMilli.map(duration = new Duration(duration)).
   getOrElse(super.slideDuration)
   }
 }
 }

 This aims to stream chunkSize elements every slideDurationMilli
 milliseconds (defaults to batch size in streaming context). It's still not
 perfect (for example, the streaming is not precise) but given that this
 will only be used for testing purposes, I don't look for ways to further
 optimize it.

 Thanks,
 Aniket



 On 2 August 2014 04:07, Mayur Rustagi mayur.rust...@gmail.com wrote:

 Nice question :)
 Ideally you should use a queuestream interface to push RDD into a queue 
 then spark streaming can handle the rest.
 Though why are you looking to convert RDD to DStream, another workaround
 folks use is to source DStream from folders  move files that they need
 reprocessed back into the folder, its a hack but much less headache .

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi everyone

 I haven't been receiving replies to my queries in the distribution list.
 Not pissed but I am actually curious to know if my messages are actually
 going through or not. Can someone please confirm that my msgs are getting
 delivered via this distribution list?

 Thanks,
 Aniket


 On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Sometimes it is useful to convert a RDD into a DStream for testing
 purposes (generating DStreams from historical data, etc). Is there an easy
 way to do this?

 I could come up with the following inefficient way but no sure if there
 is a better way to achieve this. Thoughts?

 class RDDExtension[T](rdd: RDD[T]) {

   def chunked(chunkSize: Int): RDD[Seq[T]] = {
 rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize))
   }

   def skipFirst(): RDD[T] = {
 

Re: Trying to make sense of the actual executed code

2014-08-06 Thread Michael Armbrust
This is maybe not exactly what you are asking for, but you might consider
looking at the queryExecution (a developer API that shows how the query is
analyzed / executed)

sql(...).queryExecution


On Wed, Aug 6, 2014 at 3:55 PM, Tom thubregt...@gmail.com wrote:

 Hi,

 I am trying to look at for instance the following SQL query in Spark 1.1:
 SELECT table.key, table.value, table2.value FROM table2 JOIN table WHERE
 table2.key = table.key
 When I look at the output, I see that there are several stages, and several
 tasks per stage. The tasks have a TID, I do not see such a thing for a
 stage. I see the input split of the files and start, running and finished
 messages for the tasks. But what I really want to know is the following:
 Which map, shuffle and reduces are performed in which order/where can I see
 the actual executed code per task/stage. In between files/rdd's would be a
 bonus!

 Thanks in advance,

 Tom



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-make-sense-of-the-actual-executed-code-tp11594.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Fwd: Trying to make sense of the actual executed code

2014-08-06 Thread Tobias Pfeiffer
(Forgot to include the mailing list in my reply. Here it is.)


Hi,

On Thu, Aug 7, 2014 at 7:55 AM, Tom thubregt...@gmail.com wrote:

 When I look at the output, I see that there are several stages, and several
 tasks per stage. The tasks have a TID, I do not see such a thing for a
 stage.


They should have. In my logs, for example, I see something like

INFO  scheduler.DAGScheduler - Submitting Stage 1 (MapPartitionsRDD[4] at
reduceByKey at SimpleSpark.scala:21), which has no missing parents
INFO  scheduler.DAGScheduler - Submitting Stage 0 (MapPartitionsRDD[6] at
reduceByKey at SimpleSpark.scala:21), which is now runnable


 But what I really want to know is the following:
 Which map, shuffle and reduces are performed in which order/where can I see
 the actual executed code per task/stage. In between files/rdd's would be a
 bonus!


I would also be interested in that, although I think it's quite hard to
understand what is actually being executed. I dug a bit into that
yesterday, and even the simple WordCount (flatMap, map, reduceByKey, max)
is already quite tough to understand. For example, reduceByKey consists of
three transformations (local reduceByKey, repartition by key, another local
reduceByKey), one of which happens in one stage, the other two in a
different stage. I would love to see a good visualization of that (I wonder
how the developers got their head around that without such a tool), but I
am not aware of any.

Tobias


Re: Stopping StreamingContext does not kill receiver

2014-08-06 Thread Tathagata Das
I narrowed down the error. Unfortunately this is not quick fix. I have
opened a JIRA for this.
https://issues.apache.org/jira/browse/SPARK-2892


On Wed, Aug 6, 2014 at 3:59 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Okay let me give it a shot.


 On Wed, Aug 6, 2014 at 3:57 PM, lbustelo g...@bustelos.com wrote:

 Sorry about the screenshot… but that is what I have handy right now. You
 can
 see that we get a WARN and it ultimately say that it stopped successfully.
 When looking that the application in Spark UI, it still shows the
 ReceiverTracker task running.

 It is easy to recreate. On the spark repl we are running a modified
 version
 of

 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
 .

 Then do a ssc.stop(false).

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n11595/Screen_Shot_2014-08-06_at_4.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522p11595.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Regularization parameters

2014-08-06 Thread Burak Yavuz
Hi,

That is interesting. Would you please share some code on how you are setting 
the regularization type, regularization parameters and running Logistic 
Regression?

Thanks,
Burak

- Original Message -
From: SK skrishna...@gmail.com
To: u...@spark.incubator.apache.org
Sent: Wednesday, August 6, 2014 6:18:43 PM
Subject: Regularization parameters

Hi,

I tried different regularization parameter values with Logistic Regression
for binary classification of my dataset and would like to understand the
following results:

regType = L2, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 80% 
regType = L1, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 50%

To calculate accuracy I am using 0.5 as threshold. prediction 0.5 is class
0, and prediction = 0.5 is class 1.

regParam = 0.0, implies I am not using any regularization, is that correct?
If so, it should not matter whether I specify L1 or L2, I should get the
same results. So why is the accuracy value different? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark + executor lost

2014-08-06 Thread Avishek Saha
Hi,

I get a lot of executor lost error for saveAsTextFile with PySpark
and Hadoop 2.4.

For small datasets this error occurs but since the dataset is small it
gets eventually written to the file.
For large datasets, it takes forever to write the final output.

Any help is appreciated.
Avishek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Python IDE for Spark Application Development

2014-08-06 Thread Sathish Kumaran Vairavelu
Mohit, This doesn't seems to be working can you please provide more
details? when I use from pyspark import SparkContext it is disabled in
pycharm. I use pycharm community edition. Where should I set the
environment variables in same python script or different python script?

Also, should I run any Spark local cluster so Spark program runs on top of
that?


Appreciate your help

-Sathish


On Wed, Aug 6, 2014 at 6:22 PM, Mohit Singh mohit1...@gmail.com wrote:

 My naive set up..
 Adding
 os.environ['SPARK_HOME'] = /path/to/spark
 sys.path.append(/path/to/spark/python)
 on top of my script.
 from pyspark import SparkContext
 from pyspark import SparkConf
 Execution works from within pycharm...

 Though my next step is to figure out autocompletion and I bet there are
 better ways to develop apps for spark..



 On Wed, Aug 6, 2014 at 4:16 PM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 Hello,

 I am trying to use the python IDE PyCharm for Spark application
 development. How can I use pyspark with Python IDE? Can anyone help me with
 this?


 Thanks

 Sathish





 --
 Mohit

 When you want success as badly as you want the air, then you will get it.
 There is no other secret of success.
 -Socrates



memory issue on standalone master

2014-08-06 Thread BQ
Hi There,

I'm starting using spark and got a rookie problem. I used the standalone and
master only, and here is what I did:

./sbin/start-master.sh 

./bin/pyspark

When I tried the example of wordcount.py, which my input file is a bit big,
about  I got the out of memory error, which I excerpted and pasted in below.
I have 60G RAM, in my log file, I found this:

Spark Command: java -cp
::/home/ubuntu/spark/conf:/home/ubuntu/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
org.apache.spark.deploy.master.Master --ip ip-10-123-146-183 --port 7077
--webui-port 8080

Any help please?

..
14/08/07 02:47:06 INFO PythonRDD: Times: total = 7008, boot = 10, init =
106, finish = 6892
14/08/07 02:47:06 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID
122)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:148)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/08/07 02:47:06 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID
127)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:148)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/memory-issue-on-standalone-master-tp11610.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Regularization parameters

2014-08-06 Thread Mohit Singh
One possible straightforward explanation might be your solution(s) might be
stuck in local minima?? And depending on your weights initialization, you
are getting different parameters?
 Maybe have same initial weights for both the runs...
or
I would probably test the execution with synthetic dataset with global
solutions..?



On Wed, Aug 6, 2014 at 7:12 PM, Burak Yavuz bya...@stanford.edu wrote:

 Hi,

 That is interesting. Would you please share some code on how you are
 setting the regularization type, regularization parameters and running
 Logistic Regression?

 Thanks,
 Burak

 - Original Message -
 From: SK skrishna...@gmail.com
 To: u...@spark.incubator.apache.org
 Sent: Wednesday, August 6, 2014 6:18:43 PM
 Subject: Regularization parameters

 Hi,

 I tried different regularization parameter values with Logistic Regression
 for binary classification of my dataset and would like to understand the
 following results:

 regType = L2, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 80%
 regType = L1, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 50%

 To calculate accuracy I am using 0.5 as threshold. prediction 0.5 is class
 0, and prediction = 0.5 is class 1.

 regParam = 0.0, implies I am not using any regularization, is that correct?
 If so, it should not matter whether I specify L1 or L2, I should get the
 same results. So why is the accuracy value different?

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Mohit

When you want success as badly as you want the air, then you will get it.
There is no other secret of success.
-Socrates


Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread salemi
Hi,

The reason I am looking to do it differently is because the latency and
batch processing times are bad about 40 sec. I took the times from the
Streaming UI.

As you suggested I tried the window as below and still the times are bad.
 val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
  val eventData = dStream.map(_._2).map(_.split(,)).map(data =
Data(data(0), data(1), data(2), data(3), data(4))).window(Minutes(15),
Seconds(3))
  
  val result =  eventData.transform((rdd, time) = {
rdd.registerAsTable(data)
sql(SELECT count(state) FROM data WHERE state='Active')
  })
  result.print()
  
Any suggestions?

Ali







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11612.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Column width limits?

2014-08-06 Thread Daniel, Ronald (ELS-SDG)
Assume I want to make a PairRDD whose keys are S3 URLs and whose values are 
Strings holding the contents of those (UTF-8) files, but NOT split into lines. 
Are there length limits on those files/Strings? 1 MB? 16 MB? 4 GB? 1 TB?
Similarly, can such a thing be registered as a table so that I can use substr() 
to pick out pieces of the string?

Thanks,
Ron



答复: fail to run LBFS in 5G KDD data in spark 1.0.1?

2014-08-06 Thread Lizhengbing (bing, BIPA)
I have test it in spark-1.1.0-SNAPSHOT.
It is ok now

发件人: Xiangrui Meng [mailto:men...@gmail.com]
发送时间: 2014年8月6日 23:12
收件人: Lizhengbing (bing, BIPA)
抄送: user@spark.apache.org
主题: Re: fail to run LBFS in 5G KDD data in spark 1.0.1?

Do you mind testing 1.1-SNAPSHOT and allocating more memory to the driver? I 
think the problem is with the feature dimension. KDD data has more than 20M 
features and in v1.0.1, the driver collects the partial gradients one by one, 
sums them up, does the update, and then sends the new weights back to executors 
one by one. In 1.1-SNAPSHOT, we switched to multi-level tree aggregation and 
torrent broadcasting.

For the driver memory, you can set it with spark-summit using `--driver-memory 
30g`. It could be confirmed by visiting the storage tab in the WebUI.

-Xiangrui

On Wed, Aug 6, 2014 at 1:58 AM, Lizhengbing (bing, BIPA) 
zhengbing...@huawei.commailto:zhengbing...@huawei.com wrote:
1 I don’t use spark_submit to run my problem and use spark context directly
val conf = new SparkConf()
 .setMaster(spark://123d101suse11sp3:7077)
 .setAppName(LBFGS)
 .set(spark.executor.memory, 30g)
 .set(spark.akka.frameSize,20)
val sc = new SparkContext(conf)

2 I use KDD data, size is about 5G

3 After I execute LBFGS.runLBFGS, at the stage of 7, the problem occus:

[cid:image001.png@01CFB234.3AA725F0]

14/08/06 16:44:45 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201
Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 7.0:12 failed 4 times, most recent failure: TID 304 on host 
123d103suse11sp3 failed for unknown reason
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.orghttp://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)



spark-cassandra-connector issue

2014-08-06 Thread Gary Zhao
Hello

I'm trying to modify Spark sample app to integrate with Cassandra, however
I saw exception when submitting the app. Anyone knows why it happens?

Exception in thread main java.lang.NoClassDefFoundError:
com/datastax/spark/connector/rdd/reader/RowReaderFactory
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
com.datastax.spark.connector.rdd.reader.RowReaderFactory
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 8 more


Source codes:

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

import com.datastax.spark.connector._


object SimpleApp {

  def main(args: Array[String]) {

val conf = new SparkConf(true)

.set(spark.cassandra.connection.host, 10.20.132.44)

.setAppName(Simple Application)

val logFile = /home/gzhao/spark/spark-1.0.2-bin-hadoop1/README.md //
Should be some file on your system

val sc = new SparkContext(spark://mcs-spark-slave1-staging:7077,
idfa_map, conf)

val rdd = sc.cassandraTable(idfa_map, bcookie_idfa)



val logData = sc.textFile(logFile, 2).cache()

val numAs = logData.filter(line = line.contains(a)).count()

val numBs = logData.filter(line = line.contains(b)).count()

println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))

  }

}