Re: trouble with jsonRDD and jsonFile in pyspark
There is a PR to fix this: https://github.com/apache/spark/pull/1802 On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I concur that printSchema works; it just seems to be operations that use the data where trouble happens. Thanks for posting the bug. -Brad On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote: I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when we take the data back to the Python side, SchemaRDD#javaToPython failed on your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875 to track it. Thanks, Yin On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. # dictionary as value works fine print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] # dictionary as value works fine, even when inner keys are varied print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] # dictionary as value works fine when inner keys are missing and outer key is present print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] # dictionary as value FAILS when outer key is missing print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect() Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... # dictionary as value FAILS when outer key is present with null value print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect() Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... # nested lists work even when outer key is missing print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best, -Brad On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This looks to be fixed in master: from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}' ])
Problem reading from S3 in standalone application
Hi, I'm running Spark in an EMR cluster and I'm able to read from S3 using REPL without problems: val input_file = s3://bucket-name/test_data.txt val rawdata = sc.textFile(input_file) val test = rawdata.collect but when I try to run a simple standalone application reading the same data, I get an error saying that I should provide the access keys: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object test { def main(args: Array[String]) { val master = spark://ec2-xx-xx-xxx-xxx.eu-west-1.compute.amazonaws.com:7077 val sparkHome = /home/hadoop/spark/ val sc = new SparkContext(master, test, sparkHome, Seq()) val input_file = s3://bucket-name/test_data.txt val rawdata = sc.textFile(input_file) val test = rawdata.collect sc.stop() } } [error] (run-main-0) java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy13.initialize(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094) at org.apache.spark.rdd.RDD.collect(RDD.scala:717) at test$.main(test.scala:17) at test.main(test.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) When I add the keys to the file name val input_file = s3://access key:secret access key@bucket-name/test_data.txt I get an Input path does not exist error (keys and bucket name changed from the error message, naturally): [error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://access key:secret access key@bucket-name/test_data.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://access key:secret access key@bucket-name/test_data.txt at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
Re: Problem reading from S3 in standalone application
I'm getting the same Input path does not exist error also after setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and using the format s3://bucket-name/test_data.txt for the input file. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem reading from S3 in standalone application
Try s3n:// On Aug 6, 2014, at 12:22 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: I'm getting the same Input path does not exist error also after setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and using the format s3://bucket-name/test_data.txt for the input file. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting spark.executor.memory problem
Hi Andrew, Thank you very much for your solution, it works like a charm, and for very clear explanation. Grzegorz
fail to run LBFS in 5G KDD data in spark 1.0.1?
1 I don't use spark_submit to run my problem and use spark context directly val conf = new SparkConf() .setMaster(spark://123d101suse11sp3:7077) .setAppName(LBFGS) .set(spark.executor.memory, 30g) .set(spark.akka.frameSize,20) val sc = new SparkContext(conf) 2 I use KDD data, size is about 5G 3 After I execute LBFGS.runLBFGS, at the stage of 7, the problem occus: [cid:image001.png@01CFB197.A3BD3D60] 14/08/06 16:44:45 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 7.0:12 failed 4 times, most recent failure: TID 304 on host 123d103suse11sp3 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
Spark GraphX remembering old message
Hi I am trying to find belief for a graph using GraphX Pragel implementation .My use case is like if vertex 2,3,4 are sending message m2,m3,m4 to vertex 6 .In vertex 6 I will multiple all the messages (m2*m3*m4) =m6 and then from vertex6 the message (m6/m2) will be send to vertex 2,m6/m3 to vertex 3,and m6/m4 to vertex 4. The pragel mearge() method is used to find m6( (m2*m3*m4) =m6) after that for sending message m6/m2 to vertex 2 in send message ,the old message from m2 or m3 or m4 is not available so I cannot find m6/m2,m6/m3,m6/m4 Can somebody guide me on how to resolve this using GraphX pragel API? Thanks Arun
can't submit my application on standalone spark cluster
Hi all, My name is Andres and I'm starting to use Apache Spark. I try to submit my spark.jar to my cluster using this: spark-submit --class net.redborder.spark.RedBorderApplication --master spark://pablo02:7077 redborder-spark-selfcontained.jar But when I did it .. My worker die .. and my driver too! This is my driver log: [INFO] 2014-08-06 06:30:12,025 [Driver-akka.actor.default-dispatcher-3] akka.event.slf4j.Slf4jLogger applyOrElse - Slf4jLogger started [INFO] 2014-08-06 06:30:12,061 [Driver-akka.actor.default-dispatcher-3] Remoting apply$mcV$sp - Starting remoting [ERROR] 2014-08-06 06:30:12,089 [Driver-akka.actor.default-dispatcher-6] akka.actor.ActorSystemImpl apply$mcV$sp - Uncaught fatal error from thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver] java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282) at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:200) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [INFO] 2014-08-06 06:30:12,093 [Driver-akka.actor.default-dispatcher-5] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Shutting down remote daemon. [INFO] 2014-08-06 06:30:12,095 [Driver-akka.actor.default-dispatcher-5] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remote daemon shut down; proceeding with flushing remote transports. [INFO] 2014-08-06 06:30:12,102 [Driver-akka.actor.default-dispatcher-3] Remoting apply$mcV$sp - Remoting shut down [INFO] 2014-08-06 06:30:12,104 [Driver-akka.actor.default-dispatcher-3] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remoting shut down. [ERROR] [08/06/2014 06:30:22.065] [main] [Remoting] Remoting error: [Startup timed out] [ akka.remote.RemoteTransportException: Startup timed out at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129) at akka.remote.Remoting.start(Remoting.scala:191) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
Re: can't submit my application on standalone spark cluster
Looks like a netty conflict there, most likely you are having mutiple versions of netty jars (eg: netty-3.6.6.Final.jar, netty-3.2.2.Final.jar, netty-all-4.0.13.Final.jar), you only require 3.6.6 i believe. a quick fix would be to remove the rest of them. Thanks Best Regards On Wed, Aug 6, 2014 at 3:05 PM, Andres Gomez Ferrer ago...@redborder.net wrote: Hi all, My name is Andres and I'm starting to use Apache Spark. I try to submit my spark.jar to my cluster using this: spark-submit --class net.redborder.spark.RedBorderApplication --master spark://pablo02:7077 redborder-spark-selfcontained.jar But when I did it .. My worker die .. and my driver too! This is my driver log: [INFO] 2014-08-06 06:30:12,025 [Driver-akka.actor.default-dispatcher-3] akka.event.slf4j.Slf4jLogger applyOrElse - Slf4jLogger started [INFO] 2014-08-06 06:30:12,061 [Driver-akka.actor.default-dispatcher-3] Remoting apply$mcV$sp - Starting remoting [ERROR] 2014-08-06 06:30:12,089 [Driver-akka.actor.default-dispatcher-6] akka.actor.ActorSystemImpl apply$mcV$sp - Uncaught fatal error from thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver] java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282) at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:200) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [INFO] 2014-08-06 06:30:12,093 [Driver-akka.actor.default-dispatcher-5] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Shutting down remote daemon. [INFO] 2014-08-06 06:30:12,095 [Driver-akka.actor.default-dispatcher-5] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remote daemon shut down; proceeding with flushing remote transports. [INFO] 2014-08-06 06:30:12,102 [Driver-akka.actor.default-dispatcher-3] Remoting apply$mcV$sp - Remoting shut down [INFO] 2014-08-06 06:30:12,104 [Driver-akka.actor.default-dispatcher-3] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remoting shut down. [ERROR] [08/06/2014 06:30:22.065] [main] [Remoting] Remoting error: [Startup timed out] [ akka.remote.RemoteTransportException: Startup timed out at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129) at akka.remote.Remoting.start(Remoting.scala:191) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) at
Re: Problem reading from S3 in standalone application
Evan R. Sparks wrote Try s3n:// Thanks, that works! In REPL, I can succesfully load the data using both s3:// and s3n://, why the difference? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL (version 1.1.0-SNAPSHOT) should allow SELECT with duplicated columns
Spark reported error java.lang.IllegalArgumentException with messages: java.lang.IllegalArgumentException: requirement failed: Found fields with the same name. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.sql.catalyst.types.StructType.init(dataTypes.scala:317) at org.apache.spark.sql.catalyst.types.StructType$.fromAttributes(dataTypes.scala:310) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertToString(ParquetTypes.scala:306) at org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:83) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:433) After trial and error, it seems it's caused by duplicated columns in my select clause. I made the duplication on purpose for my code to parse correctly. I think we should allow users to specify duplicated columns as return value. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark stream data from kafka topics and output as parquet file on HDFS
Hello, I have referred link https://github.com/dibbhatt/kafka-spark-consumer; and I have successfully consumed tuples from kafka. Tuples are JSON objects and I want to store that objects in HDFS as parque format. Please suggest me any sample example for that. Thanks in advance. On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to *stream data from kafka topics and output as parquet file on HDFS*. Please share the *sample reference* program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )* -- *Regards,* *Mahebub Sayyed*
Re: Save an RDD to a SQL Database
Hi Vida, It's possible to save an RDD as a hadoop file using hadoop output formats. It might be worthwhile to investigate using DBOutputFormat and see if this will work for you. I haven't personally written to a db, but I'd imagine this would be one way to do it. Thanks, Ron Sent from my iPhone On Aug 5, 2014, at 8:29 PM, Vida Ha vid...@gmail.com wrote: Hi, I would like to save an RDD to a SQL database. It seems like this would be a common enough use case. Are there any built in libraries to do it? Otherwise, I'm just planning on mapping my RDD, and having that call a method to write to the database. Given that a lot of records are going to be written, the code would need to be smart and do a batch insert after enough records have collected. Does that sound like a reasonable approach? -Vida - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem reading from S3 in standalone application
See here: https://wiki.apache.org/hadoop/AmazonS3 s3:// refers to a block storage system and is deprecated http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-file-systems.html. Use s3n:// for regular files you can see in the S3 web console. Nick On Wed, Aug 6, 2014 at 6:43 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Evan R. Sparks wrote Try s3n:// Thanks, that works! In REPL, I can succesfully load the data using both s3:// and s3n://, why the difference? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Streaming] updateStateByKey trouble
Hi folks, hoping someone who works with Streaming can help me out. I have the following snippet: val stateDstream = data.map(x = (x, 1)) .updateStateByKey[State](updateFunc) stateDstream.saveAsTextFiles(checkpointDirectory, partitions_test) where data is a RDD of case class StateKey(host:String,hour:String,customer:String) when I dump out the stream, I see duplicate values in the same partition (I've bolded the keys that are identical): (StateKey(foo.com.br,2014-07-22-18,16),State(43,2014-08-06T14:05:29.831Z)) (*StateKey*(www.abcd.com ,2014-07-22-22,25),State(2564,2014-08-06T14:05:29.831Z)) (StateKey(bar.com,2014-07-04-20,29),State(77,2014-08-06T14:05:29.831Z)) (*StateKey*(www.abcd.com ,2014-07-22-22,25),State(1117,2014-08-06T14:05:29.831Z)) I was under the impression that on each batch, the stream will contain a single RDD with Key-Value pairs, reflecting the latest state of each key. Am I misunderstanding this? Or is the key equality somehow failing? Any tips on this appreciated... PS. For completeness State is case class State(val count:Integer,val update_date:DateTime)
Re: trouble with jsonRDD and jsonFile in pyspark
Nice catch Brad and thanks to Yin and Davies for getting on it so quickly. On Wed, Aug 6, 2014 at 2:45 AM, Davies Liu dav...@databricks.com wrote: There is a PR to fix this: https://github.com/apache/spark/pull/1802 On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I concur that printSchema works; it just seems to be operations that use the data where trouble happens. Thanks for posting the bug. -Brad On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote: I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when we take the data back to the Python side, SchemaRDD#javaToPython failed on your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875 to track it. Thanks, Yin On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I checked out and built master. Note that Maven had a problem building Kafka (in my case, at least); I was unable to fix this easily so I moved on since it seemed unlikely to have any influence on the problem at hand. Master improves functionality (including the example Nicholas just demonstrated) but unfortunately there still seems to be a bug related to using dictionaries as values. I've put some code below to illustrate the bug. # dictionary as value works fine print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value}}'])).collect() [Row(key0=Row(key1=u'value'))] # dictionary as value works fine, even when inner keys are varied print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}', '{key0: {key2: value2}}'])).collect() [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, key2=u'value2'))] # dictionary as value works fine when inner keys are missing and outer key is present print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1: value1}}'])).collect() [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] # dictionary as value FAILS when outer key is missing print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1: value1}}'])).collect() Py4JJavaError: An error occurred while calling o84.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage 7.0 (TID 242, engelland.research.intel-research.net): java.lang.NullPointerException... # dictionary as value FAILS when outer key is present with null value print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0: {key1: value1}}'])).collect() Py4JJavaError: An error occurred while calling o98.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage 9.0 (TID 305, kunitz.research.intel-research.net): java.lang.NullPointerException... # nested lists work even when outer key is missing print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1], [item2, item3]]}'])).collect() [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])] Is anyone able to replicate this behavior? -Brad On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com wrote: We try to keep master very stable, but this is where active development happens. YMMV, but a lot of people do run very close to master without incident (myself included). branch-1.0 has been cut for a while and we only merge bug fixes into it (this is more strict for non-alpha components like spark core.). For Spark SQL, this branch is pretty far behind as the project is very young and we are fixing bugs / adding features very rapidly compared with Spark core. branch-1.1 was just cut and is being QAed for a release, at this point its likely the same as master, but that will change as features start getting added to master in the coming weeks. On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: collect() works, too. sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], [4,5,6]]}'])).collect() [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] Can’t answer your question about branch stability, though. Spark is a very active project, so stuff is happening all the time. Nick On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Nick, Can you check that the call to collect() works as well as printSchema()? I actually experience that printSchema() works fine, but then it crashes on collect(). In general, should I expect the master (which seems to be on branch-1.1) to be any more/less stable than branch-1.0? While it would be great to have this fixed, it would be good to know if I should expect lots of other instability. best,
Re: fail to run LBFS in 5G KDD data in spark 1.0.1?
Do you mind testing 1.1-SNAPSHOT and allocating more memory to the driver? I think the problem is with the feature dimension. KDD data has more than 20M features and in v1.0.1, the driver collects the partial gradients one by one, sums them up, does the update, and then sends the new weights back to executors one by one. In 1.1-SNAPSHOT, we switched to multi-level tree aggregation and torrent broadcasting. For the driver memory, you can set it with spark-summit using `--driver-memory 30g`. It could be confirmed by visiting the storage tab in the WebUI. -Xiangrui On Wed, Aug 6, 2014 at 1:58 AM, Lizhengbing (bing, BIPA) zhengbing...@huawei.com wrote: 1 I don’t use spark_submit to run my problem and use spark context directly val conf = new SparkConf() .setMaster(spark://123d101suse11sp3:7077) .setAppName(LBFGS) .set(spark.executor.memory, 30g) .set(spark.akka.frameSize,20) val sc = new SparkContext(conf) 2 I use KDD data, size is about 5G 3 After I execute LBFGS.runLBFGS, at the stage of 7, the problem occus: 14/08/06 16:44:45 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 7.0:12 failed 4 times, most recent failure: TID 304 on host 123d103suse11sp3 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
Re: Save an RDD to a SQL Database
Hi Vida, I am writing to a DB -- or trying to :). I believe the best practice for this (you can search the mailing list archives) is to do a combination of mapPartitions and use a grouped iterator. Look at this thread, esp. the comment from A. Boisvert and Matei's comment above it: https://groups.google.com/forum/#!topic/spark-users/LUb7ZysYp2k Basically the short story is that you want to open as few connections as possible but write more than 1 insert at a time. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Save-an-RDD-to-a-SQL-Database-tp11516p11549.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark, numpy arrays and binary data
Hello, I'm interested in getting started with Spark to scale our scientific analysis package (http://pynbody.github.io) to larger data sets. The package is written in Python and makes heavy use of numpy/scipy and related frameworks. I've got a couple of questions that I have not been able to find easy answers to despite some research efforts... I hope someone here can clarify things for me a bit! * is there a preferred way to read binary data off a local disk directly into an RDD? Our I/O routines are built to read data in chunks and each chunk could be read by a different process/RDD, but it's not clear to me how to accomplish this with the existing API. Since the idea is to process data sets that don't fit into a single node's memory, reading first and then distributing via sc.parallelize is obviously not an option. * related to the first question -- when an RDD is created by parallelizing a numpy array, the array gets serialized and distributed. I see in the source that it actually gets written into a file first (!?) -- but surely the Py4J bottleneck for python array types (mentioned in the source comment) doesn't really apply to numpy arrays? Is it really necessary to dump the data onto disk first? Conversely, the collect() seems really slow and I suspect that this is due to the combination of disk I/O and python list creation. Are there any ways of getting around this if numpy arrays are being used? I'd be curious about any other best-practices tips anyone might have for running pyspark with numpy data...! Thanks! Rok
Regarding tooling/performance vs RedShift
My company is leaning towards moving much of their analytics work from our own Spark/Mesos/HDFS/Cassandra set up to RedShift. To date, I have been the internal advocate for using Spark for analytics, but a number of good points have been brought up to me. The reasons being pushed are: - RedShift exposes a jdbc interface out of the box (no devops work there) and data looks and feels like it is in a normal sql database. They want this out of the box from Spark, no trying to figure out which version matches this version of Hive/Shark/SparkSQL etc. Yes, the next release theoretically supports this but there have been release issues our team has battled to date that erode the trust. - Complaints around challenges we have faced running a spark shell locally against a cluster in EC2. It is partly a devops issue of deploying the correct configurations to local machines, being able to kick a user off hogging RAM, etc. - I want to be able to run queries from my python shell against your sequence file data, roll it up and in the same shell leverage python graph tools. - I'm not very familiar with the Python setup, but I believe by being able to run locally AND somehow add custom libraries to be accessed from PySpark this could be done. - Joins will perform much better (in RedShift) because it says it sorts it's keys. We cannot pre-compute all joins away. Basically, their argument is two-fold: 1) We get tooling out of the box from RedShift (specifically, stable JDBC access) - Spark we often are waiting for devops to get the right combo of tools working or for libraries to support sequence files. 2) There is a belief that for many of our queries (assumed to often be joins) a columnar database will perform orders of magnitude better. Anyway, a test is being setup to compare the two on the performance side but from a tools perspective it's hard to counter the issues that are brought up.
Re: Spark shell creating a local SparkContext instead of connecting to connecting to Spark Master
Thanks. This worked :). I am thinking I should add this in spark-env.sh so that spark-shell always connects to master be default. On Aug 6, 2014 12:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can always start your spark-shell by specifying the master as MASTER=spark://*whatever*:7077 $SPARK_HOME/bin/spark-shell Then it will connect to that *whatever* master. Thanks Best Regards On Tue, Aug 5, 2014 at 8:51 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi Apologies if this is a noob question. I have setup Spark 1.0.1 on EMR using a slightly modified version of script @ s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark-yarn.rb. It seems to be running fine with master logs stating: 14/08/05 14:36:56 INFO Master: I have been elected leader! New state: ALIVE 14/08/05 14:37:21 INFO Master: Registering worker ip-10-0-2-80.ec2.internal:52029 with 2 cores, 6.3 GB RAM The script has also created spark-env.sh under conf which has the following content: export SPARK_MASTER_IP=x.x.x.x export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3 export SPARK_LOCAL_DIRS=/mnt/spark/ export SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps export SPARK_ASSEMBLY_JAR=/home/hadoop/spark/lib/spark-assembly-1.0.1-hadoop2.4.0.jar However, when I run the spark-shell, sc.isLocal returns true. Also, no matter how many RDDs I cache, the used memory in the master UI (x.x.x.x:7077) shows 0B used. This leads me to believe that the spark-shell isn't connecting to Spark master and has started a local instance of spark. Is there something I am missing in my setup that allows for spark-shell to connect to master? Thanks, Aniket
Re: Regarding tooling/performance vs RedShift
1) We get tooling out of the box from RedShift (specifically, stable JDBC access) - Spark we often are waiting for devops to get the right combo of tools working or for libraries to support sequence files. The arguments about JDBC access and simpler setup definitely make sense. My first non-trivial Spark application was actually an ETL process that sliced and diced JSON + tabular data and then loaded it into Redshift. From there on you got all the benefits of your average C-store database, plus the added benefit of Amazon managing many annoying setup and admin details for your Redshift cluster. One area I'm looking forward to seeing Spark SQL excel at is offering fast JDBC access to raw data--i.e. directly against S3 / HDFS; no ETL required. For easy and flexible data exploration, I don't think you can beat that with a C-store that you have to ETL stuff into. 2) There is a belief that for many of our queries (assumed to often be joins) a columnar database will perform orders of magnitude better. This is definitely a it depends statement, but there is a detailed benchmark here https://amplab.cs.berkeley.edu/benchmark/ comparing Shark, Redshift, and other systems. Have you seen it? Redshift does very well, but Shark is on par or better than it in most of the tests. Of course, going forward we'll want to see Spark SQL match this kind of performance, and that remains to be seen. Nick On Wed, Aug 6, 2014 at 12:06 PM, Gary Malouf malouf.g...@gmail.com wrote: My company is leaning towards moving much of their analytics work from our own Spark/Mesos/HDFS/Cassandra set up to RedShift. To date, I have been the internal advocate for using Spark for analytics, but a number of good points have been brought up to me. The reasons being pushed are: - RedShift exposes a jdbc interface out of the box (no devops work there) and data looks and feels like it is in a normal sql database. They want this out of the box from Spark, no trying to figure out which version matches this version of Hive/Shark/SparkSQL etc. Yes, the next release theoretically supports this but there have been release issues our team has battled to date that erode the trust. - Complaints around challenges we have faced running a spark shell locally against a cluster in EC2. It is partly a devops issue of deploying the correct configurations to local machines, being able to kick a user off hogging RAM, etc. - I want to be able to run queries from my python shell against your sequence file data, roll it up and in the same shell leverage python graph tools. - I'm not very familiar with the Python setup, but I believe by being able to run locally AND somehow add custom libraries to be accessed from PySpark this could be done. - Joins will perform much better (in RedShift) because it says it sorts it's keys. We cannot pre-compute all joins away. Basically, their argument is two-fold: 1) We get tooling out of the box from RedShift (specifically, stable JDBC access) - Spark we often are waiting for devops to get the right combo of tools working or for libraries to support sequence files. 2) There is a belief that for many of our queries (assumed to often be joins) a columnar database will perform orders of magnitude better. Anyway, a test is being setup to compare the two on the performance side but from a tools perspective it's hard to counter the issues that are brought up.
Re: PySpark, numpy arrays and binary data
numpy array only can support basic types, so we can not use it during collect() by default. Could you give a short example about how numpy array is used in your project? On Wed, Aug 6, 2014 at 8:41 AM, Rok Roskar rokros...@gmail.com wrote: Hello, I'm interested in getting started with Spark to scale our scientific analysis package (http://pynbody.github.io) to larger data sets. The package is written in Python and makes heavy use of numpy/scipy and related frameworks. I've got a couple of questions that I have not been able to find easy answers to despite some research efforts... I hope someone here can clarify things for me a bit! * is there a preferred way to read binary data off a local disk directly into an RDD? Our I/O routines are built to read data in chunks and each chunk could be read by a different process/RDD, but it's not clear to me how to accomplish this with the existing API. Since the idea is to process data sets that don't fit into a single node's memory, reading first and then distributing via sc.parallelize is obviously not an option. If you already know how to partition the data, then you could use sc.parallelize() to distribute the description of your data, then read the data in parallel by given descriptions. For examples, you can partition your data into (path, start, length), then partitions = [(path1, start1, length), (path1, start2, length), ...] def read_chunk(path, start, length): f = open(path) f.seek(start) data = f.read(length) #processing the data rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk) * related to the first question -- when an RDD is created by parallelizing a numpy array, the array gets serialized and distributed. I see in the source that it actually gets written into a file first (!?) -- but surely the Py4J bottleneck for python array types (mentioned in the source comment) doesn't really apply to numpy arrays? Is it really necessary to dump the data onto disk first? Conversely, the collect() seems really slow and I suspect that this is due to the combination of disk I/O and python list creation. Are there any ways of getting around this if numpy arrays are being used? I'd be curious about any other best-practices tips anyone might have for running pyspark with numpy data...! Thanks! Rok - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
Update: I can get it to work by disabling iptables temporarily. I can, however, not figure out on which port I have to accept traffic. 4040 and any of the Master or Worker ports mentioned in the previous post don't work. Can it be one of the randomly assigned ones in the 30k to 60k range? Those appear to change every time, making it difficult to apply any sensible rules. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Regarding tooling/performance vs RedShift
Just to point out that the benchmark you point to has Redshift running on HDD machines instead of SSD, and it is still faster than Shark in all but one case. Like Gary, I'm also interested in replacing something we have on Redshift with Spark SQL, as it will give me much greater capability to process things. I'm willing to sacrifice some performance for the greater capability. But it would be nice to see the benchmark updated with Spark SQL, and with a more competitive configuration of Redshift. Best regards, and keep up the great work! Ron From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] Sent: Wednesday, August 06, 2014 9:30 AM To: Gary Malouf Cc: user Subject: Re: Regarding tooling/performance vs RedShift 1) We get tooling out of the box from RedShift (specifically, stable JDBC access) - Spark we often are waiting for devops to get the right combo of tools working or for libraries to support sequence files. The arguments about JDBC access and simpler setup definitely make sense. My first non-trivial Spark application was actually an ETL process that sliced and diced JSON + tabular data and then loaded it into Redshift. From there on you got all the benefits of your average C-store database, plus the added benefit of Amazon managing many annoying setup and admin details for your Redshift cluster. One area I'm looking forward to seeing Spark SQL excel at is offering fast JDBC access to raw data--i.e. directly against S3 / HDFS; no ETL required. For easy and flexible data exploration, I don't think you can beat that with a C-store that you have to ETL stuff into. 2) There is a belief that for many of our queries (assumed to often be joins) a columnar database will perform orders of magnitude better. This is definitely a it depends statement, but there is a detailed benchmark herehttps://amplab.cs.berkeley.edu/benchmark/ comparing Shark, Redshift, and other systems. Have you seen it? Redshift does very well, but Shark is on par or better than it in most of the tests. Of course, going forward we'll want to see Spark SQL match this kind of performance, and that remains to be seen. Nick On Wed, Aug 6, 2014 at 12:06 PM, Gary Malouf malouf.g...@gmail.commailto:malouf.g...@gmail.com wrote: My company is leaning towards moving much of their analytics work from our own Spark/Mesos/HDFS/Cassandra set up to RedShift. To date, I have been the internal advocate for using Spark for analytics, but a number of good points have been brought up to me. The reasons being pushed are: - RedShift exposes a jdbc interface out of the box (no devops work there) and data looks and feels like it is in a normal sql database. They want this out of the box from Spark, no trying to figure out which version matches this version of Hive/Shark/SparkSQL etc. Yes, the next release theoretically supports this but there have been release issues our team has battled to date that erode the trust. - Complaints around challenges we have faced running a spark shell locally against a cluster in EC2. It is partly a devops issue of deploying the correct configurations to local machines, being able to kick a user off hogging RAM, etc. - I want to be able to run queries from my python shell against your sequence file data, roll it up and in the same shell leverage python graph tools. - I'm not very familiar with the Python setup, but I believe by being able to run locally AND somehow add custom libraries to be accessed from PySpark this could be done. - Joins will perform much better (in RedShift) because it says it sorts it's keys. We cannot pre-compute all joins away. Basically, their argument is two-fold: 1) We get tooling out of the box from RedShift (specifically, stable JDBC access) - Spark we often are waiting for devops to get the right combo of tools working or for libraries to support sequence files. 2) There is a belief that for many of our queries (assumed to often be joins) a columnar database will perform orders of magnitude better. Anyway, a test is being setup to compare the two on the performance side but from a tools perspective it's hard to counter the issues that are brought up.
Re: can't submit my application on standalone spark cluster
Hi Andres, If you're using the EC2 scripts to start your standalone cluster, you can use ~/spark-ec2/copy-dir --delete ~/spark to sync your jars across the cluster. Note that you will need to restart the Master and the Workers afterwards through sbin/start-all.sh and sbin/stop-all.sh. If you're not using the EC2 scripts, you will have to rsync the directory manually (copy-dir just calls rsync internally). -Andrew 2014-08-06 2:39 GMT-07:00 Akhil Das ak...@sigmoidanalytics.com: Looks like a netty conflict there, most likely you are having mutiple versions of netty jars (eg: netty-3.6.6.Final.jar, netty-3.2.2.Final.jar, netty-all-4.0.13.Final.jar), you only require 3.6.6 i believe. a quick fix would be to remove the rest of them. Thanks Best Regards On Wed, Aug 6, 2014 at 3:05 PM, Andres Gomez Ferrer ago...@redborder.net wrote: Hi all, My name is Andres and I'm starting to use Apache Spark. I try to submit my spark.jar to my cluster using this: spark-submit --class net.redborder.spark.RedBorderApplication --master spark://pablo02:7077 redborder-spark-selfcontained.jar But when I did it .. My worker die .. and my driver too! This is my driver log: [INFO] 2014-08-06 06:30:12,025 [Driver-akka.actor.default-dispatcher-3] akka.event.slf4j.Slf4jLogger applyOrElse - Slf4jLogger started [INFO] 2014-08-06 06:30:12,061 [Driver-akka.actor.default-dispatcher-3] Remoting apply$mcV$sp - Starting remoting [ERROR] 2014-08-06 06:30:12,089 [Driver-akka.actor.default-dispatcher-6] akka.actor.ActorSystemImpl apply$mcV$sp - Uncaught fatal error from thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver] java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282) at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:200) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [INFO] 2014-08-06 06:30:12,093 [Driver-akka.actor.default-dispatcher-5] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Shutting down remote daemon. [INFO] 2014-08-06 06:30:12,095 [Driver-akka.actor.default-dispatcher-5] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remote daemon shut down; proceeding with flushing remote transports. [INFO] 2014-08-06 06:30:12,102 [Driver-akka.actor.default-dispatcher-3] Remoting apply$mcV$sp - Remoting shut down [INFO] 2014-08-06 06:30:12,104 [Driver-akka.actor.default-dispatcher-3] akka.remote.RemoteActorRefProvider$RemotingTerminator apply$mcV$sp - Remoting shut down.
Re: Submitting to a cluster behind a VPN, configuring different IP address
Hi, I'm having the exact same problem - I'm on a VPN and I'm trying to set the proproperties spark.httpBroadcast.uri and spark.fileserver.uri so that they bind to my VPN ip instead of my regular network IP. Were you ever able to get this working? Cheers, -Rob -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-to-a-cluster-behind-a-VPN-configuring-different-IP-address-tp9360p11560.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
Hi Simon, The drivers and executors currently choose random ports to talk to each other, so the Spark nodes will have to have full TCP access to each other. This is changed in a very recent commit, where all of these random ports will become configurable: https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999. This will be available in Spark 1.1, but for now you will have to open all ports among the nodes in your cluster. -Andrew 2014-08-06 10:23 GMT-07:00 durin m...@simon-schaefer.net: Update: I can get it to work by disabling iptables temporarily. I can, however, not figure out on which port I have to accept traffic. 4040 and any of the Master or Worker ports mentioned in the previous post don't work. Can it be one of the randomly assigned ones in the 30k to 60k range? Those appear to change every time, making it difficult to apply any sensible rules. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX Pagerank application
I want to use pagerank on a 3GB textfile, which contains a bipartite list with variables id and brand. Example: id,brand 86246,15343 86246,27873 86246,14647 86246,55172 86246,3293 86246,2820 86246,3830 86246,2820 86246,5603 86246,72482 To perform the page rank I have to create a graph object, adding the edges by setting sourceID=id and distID=brand. In GraphLab there is function: g = SGraph().add_edges(data, src_field='id', dst_field='brand') Is there something similar in GraphX? In the GraphX docs there is an example where a separate edgelist and usernames are joined, but I couldn't find a use case for my problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pagerank-application-tp11562.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Runnning a Spark Shell locally against EC2
Hi Gary, This has indeed been a limitation of Spark, in that drivers and executors use random ephemeral ports to talk to each other. If you are submitting a Spark job from your local machine in client mode (meaning, the driver runs on your machine), you will need to open up all TCP ports from your worker machines, a requirement that is not super secure. However, a very recent commit changes this ( https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999) in that you can now manually configure all ports and only open up the ones you configured. This will be available in Spark 1.1. -Andrew 2014-08-06 8:29 GMT-07:00 Gary Malouf malouf.g...@gmail.com: We have Spark 1.0.1 on Mesos deployed as a cluster in EC2. Our Devops lead tells me that Spark jobs can not be submitted from local machines due to the complexity of opening the right ports to the world etc. Are other people running the shell locally in a production environment?
Re: Runnning a Spark Shell locally against EC2
This will be awesome - it's been one of the major issues for our analytics team as they hope to use their own python libraries. On Wed, Aug 6, 2014 at 2:40 PM, Andrew Or and...@databricks.com wrote: Hi Gary, This has indeed been a limitation of Spark, in that drivers and executors use random ephemeral ports to talk to each other. If you are submitting a Spark job from your local machine in client mode (meaning, the driver runs on your machine), you will need to open up all TCP ports from your worker machines, a requirement that is not super secure. However, a very recent commit changes this ( https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999) in that you can now manually configure all ports and only open up the ones you configured. This will be available in Spark 1.1. -Andrew 2014-08-06 8:29 GMT-07:00 Gary Malouf malouf.g...@gmail.com: We have Spark 1.0.1 on Mesos deployed as a cluster in EC2. Our Devops lead tells me that Spark jobs can not be submitted from local machines due to the complexity of opening the right ports to the world etc. Are other people running the shell locally in a production environment?
Re: issue with spark and bson input
Finally I made it work. The trick was in asSubclass method: val mongoRDD = sc.newAPIHadoopFile(file:///root/jobs/dump/input.bson, classOf[BSONFileInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object, BSONObject]]), classOf[Object], classOf[BSONObject], config) 2014-08-06 0:43 GMT+04:00 Dmitriy Selivanov selivanov.dmit...@gmail.com: Hello, I have issue when try to use bson file as spark input. I use mongo-hadoop-connector 1.3.0 and spark 1.0.0: val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val config = new Configuration() config.set(mongo.job.input.format, com.mongodb.hadoop.BSONFileInputFormat) config.set(mapred.input.dir, file:///root/jobs/dump/input.bson) config.set(mongo.output.uri, mongodb:// + args(0) + / + args(2)) val mongoRDD = sc.newAPIHadoopFile(file:///root/jobs/dump/input.bson, classOf[BSONFileInputFormat], classOf[Object], classOf[BSONObject], config) But on last line I recieve error: inferred type arguments [Object,org.bson.BSONObject,com.mongodb.hadoop.BSONFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] this is very strange, because BSONFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat: https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/BSONFileInputFormat.java How I can solve this issue? I have no problems with com.mongodb.hadoop.MongoInputFormat when use mongodb collection as input. And moreover seems there is no problem with java api: https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java I'm not professional java/scala developer, please help. -- Regards Dmitriy Selivanov -- Regards Dmitriy Selivanov
Re: Spark Streaming fails - where is the problem?
Hi Andrew, for this test I only have one machine which provides the master and only worker. So all I'd need is communication to the Internet to access the twitter API. I've tried assigning a specific port to the driver and creating iptables rules for this port, but that didn't work. Best regards, Simon On Aug 6, 2014 11:37 AM, quot;Andrew Or-2 [via Apache Spark User List]quot; lt;ml-node+s1001560n11561...@n3.nabble.comgt; wrote: Hi Simon, The drivers and executors currently choose random ports to talk to each other, so the Spark nodes will have to have full TCP access to each other. This is changed in a very recent commit, where all of these random ports will become configurable: https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999 . This will be available in Spark 1.1, but for now you will have to open all ports among the nodes in your cluster. -Andrew 2014-08-06 10:23 GMT-07:00 durin lt; [hidden email] gt;: lt;blockquote style='border-left:2px solid #CC;padding:0 1em' class=quot;gmail_quotequot; style=quot;margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1exquot;gt;Update: I can get it to work by disabling iptables temporarily. I can, however, not figure out on which port I have to accept traffic. 4040 and any of the Master or Worker ports mentioned in the previous post don#39;t work. Can it be one of the randomly assigned ones in the 30k to 60k range? Those appear to change every time, making it difficult to apply any sensible rules. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11561.html To unsubscribe from Spark Streaming fails - where is the problem?, click here . NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11566.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
heterogeneous cluster hardware
I'm sure this must be a fairly common use-case for spark, yet I have not found a satisfactory discussion of it on the spark website or forum: I work at a company with a lot of previous-generation server hardware sitting idle-- I want to add this hardware to my spark cluster to increase performance! BUT: It is unclear as to whether the spark master will be able to properly apportion jobs to the slaves if they have differing hardware specs. As I understand, the default spark launch scripts are incompatible with per-node hardware configurations, but it seems I could compose custom spark-conf.sh files for each slave to fully utilize its hardware. Would the master take these per-node configurations into consideration when allocating work? or would the cluster necessarily fall to the lowest-common-hardware-denominator? Is this an area which needs development? I might be willing to look into attempting to introduce this functionality if it is lacking. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark memory management
I have a few questions about managing Spark memory: 1) In a standalone setup, is their any cpu prioritization across users running jobs? If so, what is the behavior here? 2) With Spark 1.1, users will more easily be able to run drivers/shells from remote locations that do not cause firewall headaches. Is there a way to kill an individual user's job from the console without killing workers? We are in Mesos and are not aware of an easy way to handle this, but I imagine standalone mode may handle this.
Spark build error
Hi, I am trying to build jars using the command : mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package Execution of the above command is throwing the following error: [INFO] Spark Project Core . FAILURE [ 0.295 s] [INFO] Spark Project Bagel SKIPPED [INFO] Spark Project GraphX ... SKIPPED [INFO] Spark Project ML Library ... SKIPPED [INFO] Spark Project Streaming SKIPPED [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Catalyst . SKIPPED [INFO] Spark Project SQL .. SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project YARN Parent POM .. SKIPPED [INFO] Spark Project YARN Stable API .. SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 3.748 s [INFO] Finished at: 2014-08-07T01:00:48+05:30 [INFO] Final Memory: 24M/175M [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process (default) on project spark-core_2.10: Execution default of goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process failed: For artifact {null:null:null:jar}: The groupId cannot be empty. - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process (default) on project spark-core_2.10: Execution default of goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process failed: For artifact {null:null:null:jar}: The groupId cannot be empty. at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:347) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:154) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:213) at org.apache.maven.cli.MavenCli.main(MavenCli.java:157) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) Caused by: org.apache.maven.plugin.PluginExecutionException: Execution default of goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process failed: For artifact {null:null:null:jar}: The groupId cannot be empty. at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:143) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 19 more Caused by: org.apache.maven.artifact.InvalidArtifactRTException: For artifact {null:null:null:jar}: The groupId cannot be empty. Can someone help me on this ?
Re: Unit Test for Spark Streaming
Thank you TD, I have worked around that problem and now the test compiles. However, I don't actually see that test running. As when I do mvn test, it just says BUILD SUCCESS, without any TEST section on stdout. Are we suppose to use mvn test to run the test? Are there any other methods can be used to run this test? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-for-Spark-Streaming-tp11394p11570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regarding tooling/performance vs RedShift
Forgot to cc the mailing list :) On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Agreed. Being able to use SQL to make a table, pass it to a graph algorithm, pass that output to a machine learning algorithm, being able to invoke user defined python functions, … are capabilities that far exceed what we can do with Redshift. The total performance will be much better, and the programmer productivity will be much better, even if the SQL portion is not quite as fast. Mostly I was just objecting to Redshift does very well, but Shark is on par or better than it in most of the tests when that was not how I read the results, and Redshift was on HDDs. BTW – What are you doing w/ Spark? We have a lot of text and other content that we want to mine, and are shifting onto Spark so we have the greater capabilities mentioned above. Best regards, Ron Daniel, Jr. Director, Elsevier Labs r.dan...@elsevier.com mobile: +1 619 208 3064 *From:* Gary Malouf [mailto:malouf.g...@gmail.com] *Sent:* Wednesday, August 06, 2014 12:35 PM *To:* Daniel, Ronald (ELS-SDG) *Subject:* Re: Regarding tooling/performance vs RedShift Hi Ronald, In my opinion, the performance just has to be 'close' to make that piece irrelevant. I think the real issue comes down to tooling and the ease of connecting their various python tools from the office to results coming out of Spark/other solution in 'the cloud'. On Wed, Aug 6, 2014 at 1:43 PM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Just to point out that the benchmark you point to has Redshift running on HDD machines instead of SSD, and it is still faster than Shark in all but one case. Like Gary, I'm also interested in replacing something we have on Redshift with Spark SQL, as it will give me much greater capability to process things. I'm willing to sacrifice some performance for the greater capability. But it would be nice to see the benchmark updated with Spark SQL, and with a more competitive configuration of Redshift. Best regards, and keep up the great work! Ron *From:* Nicholas Chammas [mailto:nicholas.cham...@gmail.com] *Sent:* Wednesday, August 06, 2014 9:30 AM *To:* Gary Malouf *Cc:* user *Subject:* Re: Regarding tooling/performance vs RedShift 1) We get tooling out of the box from RedShift (specifically, stable JDBC access) - Spark we often are waiting for devops to get the right combo of tools working or for libraries to support sequence files. The arguments about JDBC access and simpler setup definitely make sense. My first non-trivial Spark application was actually an ETL process that sliced and diced JSON + tabular data and then loaded it into Redshift. From there on you got all the benefits of your average C-store database, plus the added benefit of Amazon managing many annoying setup and admin details for your Redshift cluster. One area I'm looking forward to seeing Spark SQL excel at is offering fast JDBC access to raw data--i.e. directly against S3 / HDFS; no ETL required. For easy and flexible data exploration, I don't think you can beat that with a C-store that you have to ETL stuff into. 2) There is a belief that for many of our queries (assumed to often be joins) a columnar database will perform orders of magnitude better. This is definitely a it depends statement, but there is a detailed benchmark here https://amplab.cs.berkeley.edu/benchmark/ comparing Shark, Redshift, and other systems. Have you seen it? Redshift does very well, but Shark is on par or better than it in most of the tests. Of course, going forward we'll want to see Spark SQL match this kind of performance, and that remains to be seen. Nick On Wed, Aug 6, 2014 at 12:06 PM, Gary Malouf malouf.g...@gmail.com wrote: My company is leaning towards moving much of their analytics work from our own Spark/Mesos/HDFS/Cassandra set up to RedShift. To date, I have been the internal advocate for using Spark for analytics, but a number of good points have been brought up to me. The reasons being pushed are: - RedShift exposes a jdbc interface out of the box (no devops work there) and data looks and feels like it is in a normal sql database. They want this out of the box from Spark, no trying to figure out which version matches this version of Hive/Shark/SparkSQL etc. Yes, the next release theoretically supports this but there have been release issues our team has battled to date that erode the trust. - Complaints around challenges we have faced running a spark shell locally against a cluster in EC2. It is partly a devops issue of deploying the correct configurations to local machines, being able to kick a user off hogging RAM, etc. - I want to be able to run queries from my python shell against your sequence file data, roll it up and in the same shell leverage python graph tools. - I'm not very
Re: Regarding tooling/performance vs RedShift
On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Mostly I was just objecting to Redshift does very well, but Shark is on par or better than it in most of the tests when that was not how I read the results, and Redshift was on HDDs. My bad. You are correct; the only test Shark (mem) does better on is test #1 Scan Query. And indeed, it would be good to see an updated benchmark with Redshift running on SSDs. Nick
Re: Regarding tooling/performance vs RedShift
Also, regarding something like redshift not having MLlib built in, much of that could be done on the derived results. On Aug 6, 2014 4:07 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Mostly I was just objecting to Redshift does very well, but Shark is on par or better than it in most of the tests when that was not how I read the results, and Redshift was on HDDs. My bad. You are correct; the only test Shark (mem) does better on is test #1 Scan Query. And indeed, it would be good to see an updated benchmark with Redshift running on SSDs. Nick
UpdateStateByKey - How to improve performance?
The method def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] takes Dstream (K,V) and Produces DStream (K,S) in Spark Streaming We have a input Dstream(K,V) that has 40,000 elements. We update on average of 1000 elements of them in every 3 second batch, but based on how this updateStateByKey function is defined, we are looping through 40,000 elements (Seq[V]) to make an update for just 1000 elements and not updating 39000 elements. I think looping through extra 39000 elements is a waste of performance. Isn't there a better way to update this efficiently by just figuring out the a hash map for the 1000 elements that are required to be updated and just updating it (without looping through the unwanted elements)? Shouldn't there be a Streaming update function provided that updates selective members or are we missing some concepts here? I think updateStateByKey may be causing lot of performance degradation in our app as we keep doing this again and again for every batch. Please let us know if my thought process is correct here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Regarding tooling/performance vs RedShift
Well yes, MLlib-like routines or pretty much anything else could be run on the derived results, but you have to unload the results from Redshift and then load them into some other tool. So it's nicer to leave them in memory and operate on them there. Major architectural advantage to Spark. Ron From: Gary Malouf [mailto:malouf.g...@gmail.com] Sent: Wednesday, August 06, 2014 1:17 PM To: Nicholas Chammas Cc: Daniel, Ronald (ELS-SDG); user@spark.apache.org Subject: Re: Regarding tooling/performance vs RedShift Also, regarding something like redshift not having MLlib built in, much of that could be done on the derived results. On Aug 6, 2014 4:07 PM, Nicholas Chammas nicholas.cham...@gmail.commailto:nicholas.cham...@gmail.com wrote: On Wed, Aug 6, 2014 at 3:41 PM, Daniel, Ronald (ELS-SDG)r.dan...@elsevier.commailto:r.dan...@elsevier.com wrote: Mostly I was just objecting to Redshift does very well, but Shark is on par or better than it in most of the tests when that was not how I read the results, and Redshift was on HDDs. My bad. You are correct; the only test Shark (mem) does better on is test #1 Scan Query. And indeed, it would be good to see an updated benchmark with Redshift running on SSDs. Nick
Re: Writing to RabbitMQ
Yeah, I have observed this common problem in the design a number of times in the mailing list. Tobias, what you linked to is also an additional problem, that occurs with mapPartitions, but not with foreachPartitions (which is relevant here). But I do get your point. I think there was an attempt made in that respect (setup function) using RDD.mapWith and RDD.flatMapWith, but this is got deprecated in 1.0 On Tue, Aug 5, 2014 at 9:54 AM, jschindler john.schind...@utexas.edu wrote: You are correct in that I am trying to publish inside of a foreachRDD loop. I am currently refactoring and will try publishing inside the foreachPartition loop. Below is the code showing the way it is currently written, thanks! object myData { def main(args: Array[String]) { val ssc = new StreamingContext(local[8], Data, Seconds(10)) ssc.checkpoint(checkpoint) val topicMap = Map(pagehit.data - 1) val factory = new ConnectionFactory() factory.setUsername(officialUsername) factory.setPassword(crypticPassword) factory.setVirtualHost(/) factory.setHost(rabbit-env) factory.setPort() val connection = factory.newConnection() val SQLChannel = connection.createChannel() SQLChannel.queueDeclare(SQLQueue, true, false, false, null) val Pipe = KafkaUtils.createStream(ssc, Zookeeper_1,Zookeeper_1,Zookeeper_3, Cons1, topicMap).map(_._2) //PARSE SOME JSON ETC windowStream.foreachRDD(pagehit = { val mongoClient = MongoClient(my-mongodb) val db = mongoClient(myClient) val SQLCollection = db(SQLCalls) val callArray = pagehit.map(_._1).collect val avg = (callArray.reduceLeft[Long](_+_))/callArray.length val URL = pagehit.take(1).map(_._2) SQLCollection += MongoDBObject(URL - URL(0).substring(7, URL(0).length - 1), Avg Page Load Time - avg) val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace) val byteArray = toBuildJSON.mkString.getBytes() SQLChannel.basicPublish(, SQLQueue, null, byteArray) }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regarding tooling/performance vs RedShift
On Wed, Aug 6, 2014 at 4:30 PM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Major architectural advantage to Spark. Amen to that. For a really cool and succinct demonstration of this, check out Aaron's demo http://youtu.be/sPhyePwo7FA?t=10m16s at the Hadoop Summit earlier this ear where he combines SQL, machine learning, and stream processing using Spark. I don't think you can do this with any other platform. Nick
Re: SparkR : lapplyPartition transforms the data in vertical format
The output of lapply and lapplyPartition should the same by design -- The only difference is that in lapply the user-defined function returns a row, while it returns a list in lapplyPartition. Could you given an example of a small input and output that you expect to see for the above program ? Shivaram On Wed, Aug 6, 2014 at 5:47 AM, Pranay Dave pranay.da...@gmail.com wrote: Hello As per documentation, lapply works on single records and lapplyPartition works on partition However the format of output does not change When I use lapplypartition, the data is converted to vertical format Here is my code library(SparkR) sc - sparkR.init(local) lines - textFile(sc,/sparkdev/datafiles/covariance.txt) totals - lapply(lines, function(lines) { sumx - 0 sumy - 0 totaln - 0 for (i in 1:length(lines)){ dataxy - unlist(strsplit(lines[i], ,)) sumx - sumx + as.numeric(dataxy[1]) sumy - sumy + as.numeric(dataxy[2]) } ##list(as.numeric(sumx), as.numeric(sumy), as.numeric(sumxy), as.numeric(totaln)) ##list does same as below c(sumx,sumy) } ) output - collect(totals) for (element in output) { cat(as.character(element[1]),as.character(element[2]), \n) } I am expecting output as 55, 55 However it is giving 55,NA 55,NA Where am I going wrong ? Thanks Pranay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-lapplyPartition-transforms-the-data-in-vertical-format-tp11540.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming at-least once guarantee
Why cant you keep a persistent queue of S3 files to process? The process that you are running has two threads Thread 1: Continuously gets SQS messages and write to the queue. This queue is persisted to the reliable storage, like HDFS / S3. Thread 2: Peek into the queue, and whenever there is any message, start spark job to process the files. Once the spark job is over, it will dequeue that file from that queue. If this process fails in the middle of processing some files, then it can be restarted and thread 2 will start processing the files in the queue again (since they were not dequeued, as the jobs had not finished successfully). The file input stream in Spark Streaming essentially does this same thing. So you can either implement this directly. Or, you can subclass FileInputDStream, and override the functions that find new files to process. There you can start a different thread, that listens and queues SQS messages. Then when compute() method is called after every batch interval, the SQS message queued should process, and RDDs from the files needs to be generated. This new Input dstream may be worth a try, but I feel that there will be corner cases which will be harder to deal with in this architecture, but can be dealt with the custom architecture i stated first. Corner cases like: what if a SQS message is downloaded but the process failes before creating RDDs out of them (there could be a delay of batch interval between those two in Spark Streaming)? When restarted, can you refetch the messages once again? Hope this helps! TD On Wed, Aug 6, 2014 at 12:13 AM, lalit1303 la...@sigmoidanalytics.com wrote: Hi TD, Thanks a lot for your reply :) I am already looking into creating a new DStream for SQS messages. It would be very helpful if you can provide with some guidance regarding the same. The main motive of integrating SQS with spark streaming is to make my Jobs run in high availability. As of now I am having a downloader, which downloads file pointed by SQS messages and the my spark streaming job comes in action to process them. I am planning to move whole architecture into high availability (spark streaming job can easily be shifted to high availability), only piece left is integrate SQS with spark streaming such that it can automatically recover master node failure. Also, I want to make a single pipeline, start from getting SQS message to the processing of corresponding file. I couldn't think of any other approach to make my SQS downloader run in high availability mode. The only thing I have to get, is create a Dstream which reads sqs messages from the corresponding queue. Please let me know if there is any other work around. Thanks -- Lalit - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-at-least-once-guarantee-tp10902p11525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UpdateStateByKey - How to improve performance?
Depending on the density of your keys, the alternative signature def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ? Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean)(implicit arg0: ClassTag[S]): DStream[(K, S)] at least iterates by key rather than by (old) value. I believe your thinking is correct that there might be a performance improvement opportunity for your case if there were an updateStateByKey() that instead iterated by (new) value. BTW, my impression from the stock examples is that the signature I pasted above was intended to be the more typically called updateStateByKey(), as opposed to the one you pasted, for which my impression is that it is the more general purpose one. I have used the more general purpose one but only when I needed to peek into the entire set of states for some unusual reason. On Wednesday, August 6, 2014 2:30 PM, Venkat Subramanian vsubr...@gmail.com wrote: The method def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] takes Dstream (K,V) and Produces DStream (K,S) in Spark Streaming We have a input Dstream(K,V) that has 40,000 elements. We update on average of 1000 elements of them in every 3 second batch, but based on how this updateStateByKey function is defined, we are looping through 40,000 elements (Seq[V]) to make an update for just 1000 elements and not updating 39000 elements. I think looping through extra 39000 elements is a waste of performance. Isn't there a better way to update this efficiently by just figuring out the a hash map for the 1000 elements that are required to be updated and just updating it (without looping through the unwanted elements)? Shouldn't there be a Streaming update function provided that updates selective members or are we missing some concepts here? I think updateStateByKey may be causing lot of performance degradation in our app as we keep doing this again and again for every batch. Please let us know if my thought process is correct here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UpdateStateByKey - How to improve performance?
Hello Venkat, Your thoughts are quite spot on. The current implementation was designed to allow the functionality of timing out a state. For this to be possible, the update function need to be called each key even if there is no new data, so that the function can check things like last update time, etc to time itself out and return a None as state. However, in Spark 1.2 I plan to improve on the performance for such scenarios as yours. For the time being, you could also try other techniques http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch for improving performance (if you havent tried already). You can also set the storage level of dstream as non-serialized, which may improve perf. TD On Wed, Aug 6, 2014 at 1:29 PM, Venkat Subramanian vsubr...@gmail.com wrote: The method def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] takes Dstream (K,V) and Produces DStream (K,S) in Spark Streaming We have a input Dstream(K,V) that has 40,000 elements. We update on average of 1000 elements of them in every 3 second batch, but based on how this updateStateByKey function is defined, we are looping through 40,000 elements (Seq[V]) to make an update for just 1000 elements and not updating 39000 elements. I think looping through extra 39000 elements is a waste of performance. Isn't there a better way to update this efficiently by just figuring out the a hash map for the 1000 elements that are required to be updated and just updating it (without looping through the unwanted elements)? Shouldn't there be a Streaming update function provided that updates selective members or are we missing some concepts here? I think updateStateByKey may be causing lot of performance degradation in our app as we keep doing this again and again for every batch. Please let us know if my thought process is correct here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark memory management
Hey Gary, The answer to both of your questions is that much of it is up to the application. For (1), the standalone master can set spark.deploy.defaultCores to limit the number of cores each application can grab. However, the application can override this with the applications-specific spark.cores.max, meaning there is currently nothing the master can do if the application is greedy and demands all the cores in the world. For (2), I am not aware of an existing way the standalone master can kill a user application. The most you can do is to go to the application SparkUI and kill the stages (there is a button), though this is not specific to standalone mode. There is currently a lot of trust between the standalone master and the application. Maybe this is not always a good thing. :) -Andrew 2014-08-06 12:23 GMT-07:00 Gary Malouf malouf.g...@gmail.com: I have a few questions about managing Spark memory: 1) In a standalone setup, is their any cpu prioritization across users running jobs? If so, what is the behavior here? 2) With Spark 1.1, users will more easily be able to run drivers/shells from remote locations that do not cause firewall headaches. Is there a way to kill an individual user's job from the console without killing workers? We are in Mesos and are not aware of an easy way to handle this, but I imagine standalone mode may handle this.
Re: Unit Test for Spark Streaming
Does it not show the name of the testsuite on stdout, showing that it has passed? Can you try writing a small test unit-test, in the same way as your kafka unit test, and with print statements on stdout ... to see whether it works? I believe it is some configuration issue in maven, which is hard for me to guess. TD On Wed, Aug 6, 2014 at 12:53 PM, JiajiaJing jj.jing0...@gmail.com wrote: Thank you TD, I have worked around that problem and now the test compiles. However, I don't actually see that test running. As when I do mvn test, it just says BUILD SUCCESS, without any TEST section on stdout. Are we suppose to use mvn test to run the test? Are there any other methods can be used to run this test? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-for-Spark-Streaming-tp11394p11570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stopping StreamingContext does not kill receiver
I'm running on spark 1.0.0 and I see a similar problem when using the socketTextStream receiver. The ReceiverTracker task sticks around after a ssc.stop(false). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522p11587.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark stream data from kafka topics and output as parquet file on HDFS
You can use SparkSQL for that very easily. You can convert the rdds you get from kafka input stream, convert them to a RDDs of case classes and save as parquet files. More information here. https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files On Wed, Aug 6, 2014 at 5:23 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hello, I have referred link https://github.com/dibbhatt/kafka-spark-consumer; and I have successfully consumed tuples from kafka. Tuples are JSON objects and I want to store that objects in HDFS as parque format. Please suggest me any sample example for that. Thanks in advance. On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to *stream data from kafka topics and output as parquet file on HDFS*. Please share the *sample reference* program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )* -- *Regards,* *Mahebub Sayyed*
Re: Spark Streaming multiple streams problem
You probably have only 10 cores in your cluster on which you are executing your job. since each dstream / receiver take one core each, the system is not able to start all of them and so everything is blocked. On Wed, Aug 6, 2014 at 3:08 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am reading multiple streams from multiple ports with a single streaming context. I have created array of Dstream. This works until 10 streams. But if I go over that ( i have checked with 15 and 20 streams), spark streaming task stucks and is taking time. I waited for 10 minutes(2.2 min in the attached screenshot), still not going through. The attached streaming UI shows where it stucks. If this is not the right way to read multiple streams, what else is the alternative?? I dont want to union the streams. I want to read them simultaneously in parallel. object StreamAnomalyDetector { def calculate(sumOfSquare: Double, sumOfN: Double, n: Int):(Int,( Double, Double, Double)) ={ val mean = sumOfN/n val varience = sumOfSquare/n - math.pow(mean,2) return (n, (mean, varience, math.sqrt(varience))) } def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamAnomalyDetector master hostname port) System.exit(1) } //Setting systen properties //System.setProperty(spark.cores.max, 3) System.setProperty(spark.executor.memory, 5g) // Create the context val ssc = new StreamingContext(args(0), StreamAnomalyDetector, Milliseconds(1000), System.getenv(SPARK_HOME), List(target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar)) //hdfs path to checkpoint old data ssc.checkpoint(hdfs://host-10-20-20-17.novalocal:9000/user/hduser/checkpointing/) val eegStreams = new Array[org.apache.spark.streaming.dstream.DStream[String]](args.length - 2) //array for multiple streams // Create the NetworkInputDStream for (a - 0 to (args.length - 3)) { eegStreams(a) = ssc.socketTextStream(args(1), args(a+2).toInt, StorageLevel.MEMORY_AND_DISK_SER) //Multiple DStreams into Array val sums = eegStreams(a).map(x = (math.pow(x.toDouble, 2), x.toDouble, 1)).reduceByWindow((a, b) = (a._1 + b._1, a._2 + b._2, a._3 + b._3),(a, b) = (a._1 - b._1, a._2 - b._2, a._3 - b._3), Seconds(4), Seconds(4)) val meanAndSD = sums.map(x = calculate(x._1,x._2,x._3)) meanAndSD.saveAsTextFiles(hdfs://host-10-20-20-17.novalocal:9000/user/hduser/output/ + (a + 1) ) } ssc.start() ssc.awaitTermination() } } Regards, Laeeq - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stopping StreamingContext does not kill receiver
Can you give the stack trace? This was the fix for the twitter stream. https://github.com/apache/spark/pull/1577/files You could try doing the same. TD On Wed, Aug 6, 2014 at 2:41 PM, lbustelo g...@bustelos.com wrote: I'm running on spark 1.0.0 and I see a similar problem when using the socketTextStream receiver. The ReceiverTracker task sticks around after a ssc.stop(false). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522p11587.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Hi, I have a DStream called eventData and it contains set of Data objects defined as followed: case class Data(startDate: Long, endDate: Long, className: String, id: String, state: String) How would the reducer and inverse reducer functions look like if I would like to add the data for current 3 second and filter out the last 3 second data? eventData.reduceByWindow(/reduceFunc/, /invReduceFunc/, Minutes(15), Seconds(3)) Thanks Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Naive Bayes parameters
1) How is the minPartitions parameter in NaiveBayes example used? What is the default value? 2) Why is the numFeatures specified as a parameter? Can this not be obtained from the data? This parameter is not specified for the other MLlib algorithms. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-parameters-tp11592.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Why isnt a simple window function sufficient? eventData.window(Minutes(15), Seconds(3)) will keep generating RDDs every 3 second, each containing last 15 minutes of data. TD On Wed, Aug 6, 2014 at 3:43 PM, salemi alireza.sal...@udo.edu wrote: Hi, I have a DStream called eventData and it contains set of Data objects defined as followed: case class Data(startDate: Long, endDate: Long, className: String, id: String, state: String) How would the reducer and inverse reducer functions look like if I would like to add the data for current 3 second and filter out the last 3 second data? eventData.reduceByWindow(/reduceFunc/, /invReduceFunc/, Minutes(15), Seconds(3)) Thanks Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Trying to make sense of the actual executed code
Hi, I am trying to look at for instance the following SQL query in Spark 1.1: SELECT table.key, table.value, table2.value FROM table2 JOIN table WHERE table2.key = table.key When I look at the output, I see that there are several stages, and several tasks per stage. The tasks have a TID, I do not see such a thing for a stage. I see the input split of the files and start, running and finished messages for the tasks. But what I really want to know is the following: Which map, shuffle and reduces are performed in which order/where can I see the actual executed code per task/stage. In between files/rdd's would be a bonus! Thanks in advance, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-make-sense-of-the-actual-executed-code-tp11594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using Python IDE for Spark Application Development
Hello, I am trying to use the python IDE PyCharm for Spark application development. How can I use pyspark with Python IDE? Can anyone help me with this? Thanks Sathish
Re: Using Python IDE for Spark Application Development
My naive set up.. Adding os.environ['SPARK_HOME'] = /path/to/spark sys.path.append(/path/to/spark/python) on top of my script. from pyspark import SparkContext from pyspark import SparkConf Execution works from within pycharm... Though my next step is to figure out autocompletion and I bet there are better ways to develop apps for spark.. On Wed, Aug 6, 2014 at 4:16 PM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Hello, I am trying to use the python IDE PyCharm for Spark application development. How can I use pyspark with Python IDE? Can anyone help me with this? Thanks Sathish -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
Hive 11 / CDH 4.6/ Spark 0.9.1 dilemmna
I posted this in cdh-user mailing list yesterday and think this should have been the right audience for this: = Hi All, Not sure if anyone else faced this same issue or not. We installed CDH 4.6 that uses Hive 0.10. And we have Spark 0.9.1 that comes with Hive 11. Now our hive jobs that work on CDH, fail in Shark. Anyone else facing same issues and any work-arounds ? Can we re-compile shark 0.9.1 with hive 10 or compile hive 11 on CDH 4.6 ? Thanks, Anurag Tangri
Re: Hive 11 / CDH 4.6/ Spark 0.9.1 dilemmna
I haven't tried any of this, mind you, but my guess is that your options are, from least painful and most likely to work onwards, are: - Get Spark / Shark to compile against Hive 0.10 - Shade Hive 0.11 into Spark - Update to CDH5.0+ I don't think there will be more updated releases of Shark or Spark-on-CDH4, so you may want to be moving forward anyway. On Thu, Aug 7, 2014 at 12:46 AM, Anurag Tangri atan...@groupon.com wrote: I posted this in cdh-user mailing list yesterday and think this should have been the right audience for this: = Hi All, Not sure if anyone else faced this same issue or not. We installed CDH 4.6 that uses Hive 0.10. And we have Spark 0.9.1 that comes with Hive 11. Now our hive jobs that work on CDH, fail in Shark. Anyone else facing same issues and any work-arounds ? Can we re-compile shark 0.9.1 with hive 10 or compile hive 11 on CDH 4.6 ? Thanks, Anurag Tangri
Re: RDD to DStream
Hey Aniket, Great thoughts! I understand the usecase. But as you have realized yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are defined to be scan based, it is not efficient to define RDD based on slices of data within a partition of another RDD, using pure RDD transformations. What you have done is a decent, and probably the only feasible solution, with its limitations. Also the requirements of converting a batch of data to a stream of data can be pretty diverse. What rate, what # of events per batch, how many batches, is it efficient? Hence, it is not trivial to define a good, clean public API for that. If any one has any thoughts, ideas, etc on this, you are more than welcome to share them. TD On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: The use case for converting RDD into DStream is that I want to simulate a stream from an already persisted data for testing analytics. It is trivial to create a RDD from any persisted data but not so much for DStream. Therefore, my idea to create DStream from RDD. For example, lets say you are trying to implement analytics on time series data using Lambda architecture. This means you would have to implement the same analytics on streaming data (in streaming mode) as well as persisted data (in batch mode). The workflow for implementing the anlytics would be to first implement it in batch mode using RDD operations and then simulate stream to test the analytics in stream mode. The simulated stream should produce the elements at a specified rate. So the solution maybe to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having the size of elements that need to be streamed per time unit and then finally stream each RDD using the compute function. The problem with using QueueInputDStream is that it will stream data as per the batch duration specified in the streaming context and one cannot specify a custom slide duration. Moreover, the class QueueInputDStream is private to streaming package, so I can't really use it/extend it from an external package. Also, I could not find a good solution split a RDD into equal sized smaller RDDs that can be fed into an extended version of QueueInputDStream. Finally, here is what I came up with: class RDDExtension[T: ClassTag](rdd: RDD[T]) { def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { private val iterator = rdd.toLocalIterator // WARNING: each partition much fit in RAM of local machine. private val grouped = iterator.grouped(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { if (grouped.hasNext) { Some(rdd.sparkContext.parallelize(grouped.next())) } else { None } } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } } This aims to stream chunkSize elements every slideDurationMilli milliseconds (defaults to batch size in streaming context). It's still not perfect (for example, the streaming is not precise) but given that this will only be used for testing purposes, I don't look for ways to further optimize it. Thanks, Aniket On 2 August 2014 04:07, Mayur Rustagi mayur.rust...@gmail.com wrote: Nice question :) Ideally you should use a queuestream interface to push RDD into a queue then spark streaming can handle the rest. Though why are you looking to convert RDD to DStream, another workaround folks use is to source DStream from folders move files that they need reprocessed back into the folder, its a hack but much less headache . Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi everyone I haven't been receiving replies to my queries in the distribution list. Not pissed but I am actually curious to know if my messages are actually going through or not. Can someone please confirm that my msgs are getting delivered via this distribution list? Thanks, Aniket On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this? I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = {
Re: Trying to make sense of the actual executed code
This is maybe not exactly what you are asking for, but you might consider looking at the queryExecution (a developer API that shows how the query is analyzed / executed) sql(...).queryExecution On Wed, Aug 6, 2014 at 3:55 PM, Tom thubregt...@gmail.com wrote: Hi, I am trying to look at for instance the following SQL query in Spark 1.1: SELECT table.key, table.value, table2.value FROM table2 JOIN table WHERE table2.key = table.key When I look at the output, I see that there are several stages, and several tasks per stage. The tasks have a TID, I do not see such a thing for a stage. I see the input split of the files and start, running and finished messages for the tasks. But what I really want to know is the following: Which map, shuffle and reduces are performed in which order/where can I see the actual executed code per task/stage. In between files/rdd's would be a bonus! Thanks in advance, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-make-sense-of-the-actual-executed-code-tp11594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Trying to make sense of the actual executed code
(Forgot to include the mailing list in my reply. Here it is.) Hi, On Thu, Aug 7, 2014 at 7:55 AM, Tom thubregt...@gmail.com wrote: When I look at the output, I see that there are several stages, and several tasks per stage. The tasks have a TID, I do not see such a thing for a stage. They should have. In my logs, for example, I see something like INFO scheduler.DAGScheduler - Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at SimpleSpark.scala:21), which has no missing parents INFO scheduler.DAGScheduler - Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at SimpleSpark.scala:21), which is now runnable But what I really want to know is the following: Which map, shuffle and reduces are performed in which order/where can I see the actual executed code per task/stage. In between files/rdd's would be a bonus! I would also be interested in that, although I think it's quite hard to understand what is actually being executed. I dug a bit into that yesterday, and even the simple WordCount (flatMap, map, reduceByKey, max) is already quite tough to understand. For example, reduceByKey consists of three transformations (local reduceByKey, repartition by key, another local reduceByKey), one of which happens in one stage, the other two in a different stage. I would love to see a good visualization of that (I wonder how the developers got their head around that without such a tool), but I am not aware of any. Tobias
Re: Stopping StreamingContext does not kill receiver
I narrowed down the error. Unfortunately this is not quick fix. I have opened a JIRA for this. https://issues.apache.org/jira/browse/SPARK-2892 On Wed, Aug 6, 2014 at 3:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Okay let me give it a shot. On Wed, Aug 6, 2014 at 3:57 PM, lbustelo g...@bustelos.com wrote: Sorry about the screenshot… but that is what I have handy right now. You can see that we get a WARN and it ultimately say that it stopped successfully. When looking that the application in Spark UI, it still shows the ReceiverTracker task running. It is easy to recreate. On the spark repl we are running a modified version of https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala . Then do a ssc.stop(false). http://apache-spark-user-list.1001560.n3.nabble.com/file/n11595/Screen_Shot_2014-08-06_at_4.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522p11595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regularization parameters
Hi, That is interesting. Would you please share some code on how you are setting the regularization type, regularization parameters and running Logistic Regression? Thanks, Burak - Original Message - From: SK skrishna...@gmail.com To: u...@spark.incubator.apache.org Sent: Wednesday, August 6, 2014 6:18:43 PM Subject: Regularization parameters Hi, I tried different regularization parameter values with Logistic Regression for binary classification of my dataset and would like to understand the following results: regType = L2, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 80% regType = L1, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 50% To calculate accuracy I am using 0.5 as threshold. prediction 0.5 is class 0, and prediction = 0.5 is class 1. regParam = 0.0, implies I am not using any regularization, is that correct? If so, it should not matter whether I specify L1 or L2, I should get the same results. So why is the accuracy value different? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark + executor lost
Hi, I get a lot of executor lost error for saveAsTextFile with PySpark and Hadoop 2.4. For small datasets this error occurs but since the dataset is small it gets eventually written to the file. For large datasets, it takes forever to write the final output. Any help is appreciated. Avishek - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Python IDE for Spark Application Development
Mohit, This doesn't seems to be working can you please provide more details? when I use from pyspark import SparkContext it is disabled in pycharm. I use pycharm community edition. Where should I set the environment variables in same python script or different python script? Also, should I run any Spark local cluster so Spark program runs on top of that? Appreciate your help -Sathish On Wed, Aug 6, 2014 at 6:22 PM, Mohit Singh mohit1...@gmail.com wrote: My naive set up.. Adding os.environ['SPARK_HOME'] = /path/to/spark sys.path.append(/path/to/spark/python) on top of my script. from pyspark import SparkContext from pyspark import SparkConf Execution works from within pycharm... Though my next step is to figure out autocompletion and I bet there are better ways to develop apps for spark.. On Wed, Aug 6, 2014 at 4:16 PM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Hello, I am trying to use the python IDE PyCharm for Spark application development. How can I use pyspark with Python IDE? Can anyone help me with this? Thanks Sathish -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
memory issue on standalone master
Hi There, I'm starting using spark and got a rookie problem. I used the standalone and master only, and here is what I did: ./sbin/start-master.sh ./bin/pyspark When I tried the example of wordcount.py, which my input file is a bit big, about I got the out of memory error, which I excerpted and pasted in below. I have 60G RAM, in my log file, I found this: Spark Command: java -cp ::/home/ubuntu/spark/conf:/home/ubuntu/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip ip-10-123-146-183 --port 7077 --webui-port 8080 Any help please? .. 14/08/07 02:47:06 INFO PythonRDD: Times: total = 7008, boot = 10, init = 106, finish = 6892 14/08/07 02:47:06 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 122) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Output.require(Output.java:142) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:148) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/08/07 02:47:06 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 127) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Output.require(Output.java:142) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:148) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/memory-issue-on-standalone-master-tp11610.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regularization parameters
One possible straightforward explanation might be your solution(s) might be stuck in local minima?? And depending on your weights initialization, you are getting different parameters? Maybe have same initial weights for both the runs... or I would probably test the execution with synthetic dataset with global solutions..? On Wed, Aug 6, 2014 at 7:12 PM, Burak Yavuz bya...@stanford.edu wrote: Hi, That is interesting. Would you please share some code on how you are setting the regularization type, regularization parameters and running Logistic Regression? Thanks, Burak - Original Message - From: SK skrishna...@gmail.com To: u...@spark.incubator.apache.org Sent: Wednesday, August 6, 2014 6:18:43 PM Subject: Regularization parameters Hi, I tried different regularization parameter values with Logistic Regression for binary classification of my dataset and would like to understand the following results: regType = L2, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 80% regType = L1, regParam = 0.0 , I am getting AUC = 0.80 and accuracy of 50% To calculate accuracy I am using 0.5 as threshold. prediction 0.5 is class 0, and prediction = 0.5 is class 1. regParam = 0.0, implies I am not using any regularization, is that correct? If so, it should not matter whether I specify L1 or L2, I should get the same results. So why is the accuracy value different? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Hi, The reason I am looking to do it differently is because the latency and batch processing times are bad about 40 sec. I took the times from the Streaming UI. As you suggested I tried the window as below and still the times are bad. val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) val eventData = dStream.map(_._2).map(_.split(,)).map(data = Data(data(0), data(1), data(2), data(3), data(4))).window(Minutes(15), Seconds(3)) val result = eventData.transform((rdd, time) = { rdd.registerAsTable(data) sql(SELECT count(state) FROM data WHERE state='Active') }) result.print() Any suggestions? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11612.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Column width limits?
Assume I want to make a PairRDD whose keys are S3 URLs and whose values are Strings holding the contents of those (UTF-8) files, but NOT split into lines. Are there length limits on those files/Strings? 1 MB? 16 MB? 4 GB? 1 TB? Similarly, can such a thing be registered as a table so that I can use substr() to pick out pieces of the string? Thanks, Ron
答复: fail to run LBFS in 5G KDD data in spark 1.0.1?
I have test it in spark-1.1.0-SNAPSHOT. It is ok now 发件人: Xiangrui Meng [mailto:men...@gmail.com] 发送时间: 2014年8月6日 23:12 收件人: Lizhengbing (bing, BIPA) 抄送: user@spark.apache.org 主题: Re: fail to run LBFS in 5G KDD data in spark 1.0.1? Do you mind testing 1.1-SNAPSHOT and allocating more memory to the driver? I think the problem is with the feature dimension. KDD data has more than 20M features and in v1.0.1, the driver collects the partial gradients one by one, sums them up, does the update, and then sends the new weights back to executors one by one. In 1.1-SNAPSHOT, we switched to multi-level tree aggregation and torrent broadcasting. For the driver memory, you can set it with spark-summit using `--driver-memory 30g`. It could be confirmed by visiting the storage tab in the WebUI. -Xiangrui On Wed, Aug 6, 2014 at 1:58 AM, Lizhengbing (bing, BIPA) zhengbing...@huawei.commailto:zhengbing...@huawei.com wrote: 1 I don’t use spark_submit to run my problem and use spark context directly val conf = new SparkConf() .setMaster(spark://123d101suse11sp3:7077) .setAppName(LBFGS) .set(spark.executor.memory, 30g) .set(spark.akka.frameSize,20) val sc = new SparkContext(conf) 2 I use KDD data, size is about 5G 3 After I execute LBFGS.runLBFGS, at the stage of 7, the problem occus: [cid:image001.png@01CFB234.3AA725F0] 14/08/06 16:44:45 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 7.0:12 failed 4 times, most recent failure: TID 304 on host 123d103suse11sp3 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.orghttp://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
spark-cassandra-connector issue
Hello I'm trying to modify Spark sample app to integrate with Cassandra, however I saw exception when submitting the app. Anyone knows why it happens? Exception in thread main java.lang.NoClassDefFoundError: com/datastax/spark/connector/rdd/reader/RowReaderFactory at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.reader.RowReaderFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 8 more Source codes: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf(true) .set(spark.cassandra.connection.host, 10.20.132.44) .setAppName(Simple Application) val logFile = /home/gzhao/spark/spark-1.0.2-bin-hadoop1/README.md // Should be some file on your system val sc = new SparkContext(spark://mcs-spark-slave1-staging:7077, idfa_map, conf) val rdd = sc.cassandraTable(idfa_map, bcookie_idfa) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } }