Re: Executor metrics in spark application
I'm also pretty interested how to create custom Sinks in Spark. I'm using it with Ganglia and the normal metrics from JVM source do show up. I tried to create my own metric based on Issac's code, but does not show up in Ganglia. Does anyone know where is the problem? Here's the code snippet: class AccumulatorSource(accumulator: Accumulator[Long], name: String) extends Source { val sourceName = accumulator.metrics val metricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name(accumulator, name), new Gauge[Long] { override def getValue: Long = { return accumulator.value; }}); } and then in the main: val longAccumulator = sc.accumulator[Long](0); val accumulatorMetrics = new AccumulatorSource(longAccumulator , counters.accumulator); SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Executor metrics in spark application
I meant custom Sources, sorry. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10386.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Executor metrics in spark application
Hi Denes, I think you can register your customized metrics source into metrics system through metrics.properties, you can take metrics.propertes.template as reference, Basically you can do as follow if you want to monitor on executor: executor.source.accumulator.class=xx.xx.xx.your-customized-metrics-source I think the below code can only register metrics source in client side. SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics); BTW, it's not a good choice to register through MetricsSystem, it would be nice to register through configuration. Also you can enable console sink to verify whether the source is registered or not. Thanks Jerry -Original Message- From: Denes [mailto:te...@outlook.com] Sent: Tuesday, July 22, 2014 2:02 PM To: u...@spark.incubator.apache.org Subject: Re: Executor metrics in spark application I'm also pretty interested how to create custom Sinks in Spark. I'm using it with Ganglia and the normal metrics from JVM source do show up. I tried to create my own metric based on Issac's code, but does not show up in Ganglia. Does anyone know where is the problem? Here's the code snippet: class AccumulatorSource(accumulator: Accumulator[Long], name: String) extends Source { val sourceName = accumulator.metrics val metricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name(accumulator, name), new Gauge[Long] { override def getValue: Long = { return accumulator.value; }}); } and then in the main: val longAccumulator = sc.accumulator[Long](0); val accumulatorMetrics = new AccumulatorSource(longAccumulator , counters.accumulator); SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
number of Cached Partitions v.s. Total Partitions
Hi, I'm using local mode and read a text file as RDD using JavaSparkContext.textFile() API. And then call cache() method on the result RDD. I look at the Storage information and find the RDD has 3 partitions but 2 of them have been cached. Is this a normal behavior? I assume all of partitions should be cached or none of them. If I'm wrong, what are the cases when number of cached partitions is less than the total number of partitions?
RE: number of Cached Partitions v.s. Total Partitions
Yes, it's normal when memory is not enough to put the third partition, as you can see in your attached picture. Thanks Jerry From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, July 22, 2014 3:09 PM To: user@spark.apache.org Subject: number of Cached Partitions v.s. Total Partitions Hi, I'm using local mode and read a text file as RDD using JavaSparkContext.textFile() API. And then call cache() method on the result RDD. I look at the Storage information and find the RDD has 3 partitions but 2 of them have been cached. Is this a normal behavior? I assume all of partitions should be cached or none of them. If I'm wrong, what are the cases when number of cached partitions is less than the total number of partitions? [cid:image001.jpg@01CFA5C3.0AE4B440]
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Hi, Yin Huai I test again with your snippet code. It works well in spark-1.0.1 Here is my code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class Record(data_date: String, mobile: String, create_time: String) val mobile = Record(2014-07-20,1234567,2014-07-19) val lm = List(mobile) val mobileRDD = sc.makeRDD(lm) val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD) mobileSchemaRDD.registerAsTable(mobile) sqlContext.sql(select count(1) from mobile).collect() The Result is like below: 14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.296864832 s res9: Array[org.apache.spark.sql.Row] = Array([1]) But what is the main cause of this exception? And how you find it out by looking some unknown characters like $line11.$read$ $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ? Thanks, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Job aborted due to stage failure: TID x failed for unknown reasons
Hi All, Can someone help on this? I'm encountering exactly the same issue in a very similar scenario with the same spark version. Thanks Alessandro On Fri, Jul 18, 2014 at 8:30 PM, Shannon Quinn squ...@gatech.edu wrote: Hi all, I'm dealing with some strange error messages that I *think* comes down to a memory issue, but I'm having a hard time pinning it down and could use some guidance from the experts. I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one has 16GB memory, the other 32GB (which is the master). My application involves computing pairwise pixel affinities in images, though the images I've tested so far only get as big as 1920x1200, and as small as 16x16. I did have to change a few memory and parallelism settings, otherwise I was getting explicit OutOfMemoryExceptions. In spark-default.conf: spark.executor.memory14g spark.default.parallelism32 spark.akka.frameSize1000 In spark-env.sh: SPARK_DRIVER_MEMORY=10G With those settings, however, I get a bunch of WARN statements about Lost TIDs (no task is successfully completed) in addition to lost Executors, which are repeated 4 times until I finally get the following error message and crash: --- 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at /home/user/Programming/PySpark-Affinities/affinity.py:243 Traceback (most recent call last): File /home/user/Programming/PySpark-Affinities/affinity.py, line 243, in module lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]]) File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py, line 583, in collect bytesInJava = self._jrdd.collect().iterator() File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:13 failed 4 times, most recent failure: *TID 32 on host master.host.univ.edu http://master.host.univ.edu failed for unknown reason* Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4) 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor 4 from BlockManagerMaster. 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in removeExecutor user@master:~/Programming/PySpark-Affinities$ --- If I run the really small image instead (16x16), it *appears* to run to completion (gives me the output I expect without any exceptions being thrown). However, in the stderr logs for the app that was run, it lists the state as KILLED with the final message a ERROR CoarseGrainedExecutorBackend: Driver Disassociated. If I run any larger images, I get the exception I pasted above. Furthermore, if I just do a spark-submit with master=local[*], aside from still needing to set the aforementioned memory options, it will work for an image of *any*
Re: Why spark-submit command hangs?
I've just have the same problem. I'm using pre $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client $JOBJAR --class $JOBCLASS /pre It's really strange, because the log shows that pre 14/07/22 16:16:58 INFO ui.SparkUI: Started SparkUI at http://k1227.mzhen.cn:4040 14/07/22 16:16:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/22 16:16:58 INFO spark.SparkContext: Added JAR /home/workspace/ci-demo/target/scala-2.10/SemiData-CIDemo-Library-assembly-0.1.jar at http://192.168.7.37:53050/jars/SemiData-CIDemo-Library-assembly-0.1.jar with timestamp 1406017018666 14/07/22 16:16:58 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/07/22 16:16:58 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@41ecfc8c /pre Why cluster.YarnClusterScheduler start? where's the Client? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10392.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: gain access to persisted rdd
Ok, thanks for the answers. Unfortunately, there is no sc.getPersistentRDDs for pyspark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313p10393.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Why spark-submit command hangs?
That's what my problem is:) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: saveAsSequenceFile for DStream
What about simply: dstream.foreachRDD(_.saveAsSequenceFile(...)) ? On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote: First of all, I do not know Scala, but learning. I'm doing a proof of concept by streaming content from a socket, counting the words and write it to a Tachyon disk. A different script will read the file stream and print out the results. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts) ssc.start() ssc.awaitTermination() I already did a proof of concept to write and read sequence files but there doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the best way to write out an RDD to a stream so that the timestamps are in the filenames and so there is minimal overhead in reading the data back in as objects, see below. My simple successful proof was the following: val rdd = sc.parallelize(Array((a,2), (b,3), (c,1))) rdd.saveAsSequenceFile(tachyon://.../123.sf2) val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2) How can I do something similar with streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Executor metrics in spark application
Hi Jerry, I know that way of registering a metrics, but it seems defeat the whole purpose. I'd like to define a source that is set within the application, for example number of parsed messages. If I register it in the metrics.properties, how can I obtain the instance? (or instances?) How can I set the property? Is there a way to read an accumulator values from a Source? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark over graphviz (SPARK-1015, SPARK-975)
Hi spark. I see there has been some work around graphviz visualization for spark jobs. 1) I'm wondering if anyone actively maintaining this stuff, and if so what the best docs are for it - or else, if there is interest in an upstream JIRA for updating the graphviz APIs it. 2) Also, am curious about utilities for visualizing/optimizing the flow of data through an RDD at runtime and where those are in the existing codebase. Any thoughts around pipeline visualization for spark would be appreciated. I see some conversations about it in JIRAs but not sure what the future is for this , possibly I could lend a hand if there are any loose ends needing to be tied. -- jay vyas
RE: Executor metrics in spark application
Yeah, I start to know your purpose. Original design purpose of customized metrics source is focused on self-contained source, seems you need to rely on outer variable, so the way you mentioned may be is the only way to register. Besides, as you cannot see the source in Ganglia, I think you can enable console sink to verify the outputs, also seems you want to register this source in driver, so you need to enable Ganglia sink on driver side and make sure Ganglia client can connect your driver. Thanks Jerry -Original Message- From: Denes [mailto:te...@outlook.com] Sent: Tuesday, July 22, 2014 6:38 PM To: u...@spark.incubator.apache.org Subject: RE: Executor metrics in spark application Hi Jerry, I know that way of registering a metrics, but it seems defeat the whole purpose. I'd like to define a source that is set within the application, for example number of parsed messages. If I register it in the metrics.properties, how can I obtain the instance? (or instances?) How can I set the property? Is there a way to read an accumulator values from a Source? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
collect() on small group of Avro files causes plain NullPointerException
Running a simple collect method on a group of Avro objects causes a plain NullPointerException. Does anyone know what may be wrong? files.collect() Press ENTER or type command to continue Exception in thread Executor task launch worker-0 java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: collect() on small list causes NullPointerException
For those curious I was using KryoRegistrator it was causing some null pointer exception. I removed the code and problem went away. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-list-causes-NullPointerException-tp10400p10402.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: collect() on small group of Avro files causes plain NullPointerException
Do you have a list/array in your avro record? If yes this could cause the problem. I experienced this kind of problem and solved it by providing custom kryo ser/de for avro lists. Also be carefull spark reuses records, so if you just read and then don't copy/transform them you would end up with the records having same values. 2014-07-22 15:01 GMT+02:00 Sparky gullo_tho...@bah.com: Running a simple collect method on a group of Avro objects causes a plain NullPointerException. Does anyone know what may be wrong? files.collect() Press ENTER or type command to continue Exception in thread Executor task launch worker-0 java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
hadoop version
Hi, Where can I find the version of Hadoop my cluster is using? I launched my ec2 cluster using the spark-ec2 script with the --hadoop-major-version=2 option. However, the folder hadoop-native/lib in the master node only contains files that end in 1.0.0. Does that mean that I have Hadoop version 1? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
the implications of some items in webUI
Hi, I am analysing the application processing on Spark(GraphX), but feeling a little confused on some items of webUI. 1) what is the difference between Duration(Stages - Completed Stages) and Task Time(Executors) ? for instance, 43s VS. 5.6 m Task Time is approximated to Duration multiplied with Total Tasks? 2) what are the exact meanings of Shuffle Read/Shuffle Write? Best, Yifan LI
Using case classes as keys does not seem to work.
Using a case class as a key doesn't seem to work properly. [Spark 1.0.0] A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) Any ideas why this doesn't work? -kr, Gerard.
Re: Is there anyone who use streaming join to filter spam as guide mentioned?
Hi TD, Eventually I found that I made a mistake - the RDD I used for join does not contain any content. Now it works. Thanks, Hawk On 2014年07月21日 17:58, Tathagata Das wrote: Could you share your code snippet so that we can take a look? TD On Mon, Jul 21, 2014 at 7:23 AM, hawkwang wanghao.b...@gmail.com mailto:wanghao.b...@gmail.com wrote: Hello guys, I'm just trying to use spark streaming features. I noticed that there is join example for filtering spam, so I just want to try. But, nothing happens after join, the output JavaPairDStream content is same as before. So, is there any examples that I can refer to? Thanks for any suggestions. Regards, Hawk
Re: Using case classes as keys does not seem to work.
Just to narrow down the issue, it looks like the issue is in 'reduceByKey' and derivates like 'distinct'. groupByKey() seems to work sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)), (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1))) On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Using a case class as a key doesn't seem to work properly. [Spark 1.0.0] A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) Any ideas why this doesn't work? -kr, Gerard.
Spark Streaming - How to save all items in batchs from beginning to a single stream rdd?
Hi guys, Is it possible to generate a single stream rdd which can be updated with new batch rdd content? I know that we can use updateStateByKey to make aggregation, but here just want to keep tracking all historical original content. I also noticed that we can save to redis or other storage system, but can we just use spark streaming mechanism to make it happen? Thanks for any suggestion. Regards, Hawk
Re: Problem running Spark shell (1.0.0) on EMR
I am also having exactly the same problem, calling using pyspark. Has anyone managed to get this script to work? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 16, 2014 at 2:10 PM, Ian Wilkinson ia...@me.com wrote: Hi, I’m trying to run the Spark (1.0.0) shell on EMR and encountering a classpath issue. I suspect I’m missing something gloriously obviously, but so far it is eluding me. I launch the EMR Cluster (using the aws cli) with: aws emr create-cluster --name Test Cluster \ --ami-version 3.0.3 \ --no-auto-terminate \ --ec2-attributes KeyName=... \ --bootstrap-actions Path=s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb \ --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium \ InstanceGroupType=CORE,InstanceCount=1,InstanceType=m1.medium --region eu-west-1 then, $ aws emr ssh --cluster-id ... --key-pair-file ... --region eu-west-1 On the master node, I then launch the shell with: [hadoop@ip-... spark]$ ./bin/spark-shell and try performing: scala val logs = sc.textFile(s3n://.../“) this produces: 14/07/16 12:40:35 WARN storage.BlockManager: Putting block broadcast_0 failed java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; Any help mighty welcome, ian
Re: Using case classes as keys does not seem to work.
I can confirm this bug. The behavior for groupByKey is the same as reduceByKey - your example is actually grouping on just the name. Try this: sc.parallelize(ps).map(x= (x,1)).groupByKey().collect res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)), (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)), (P(charly),ArrayBuffer(1))) On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Just to narrow down the issue, it looks like the issue is in 'reduceByKey' and derivates like 'distinct'. groupByKey() seems to work sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)), (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1))) On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Using a case class as a key doesn't seem to work properly. [Spark 1.0.0] A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) Any ideas why this doesn't work? -kr, Gerard. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Tranforming flume events using Spark transformation functions
Hi All, I am getting events from flume using following line. JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, port); Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these function directly on this? I need to do following kind of operations AA YDelta TAA Southwest AA Unique tickets are , Y, , , . Count is 2, 1, T 1 and so on... AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket. I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDDString. Can I create new JavaRDDString? How do I create new JavaRDD? I loop through the events like below flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); } Where do I create new JavaRDDString? DO I do it before this loop? How do I create this JavaRDDString? In the loop I am able to get every record and I am able to print them. I appreciate any help here. Thanks, Muthu
Re: Using case classes as keys does not seem to work.
Yes, right. 'sc.parallelize(ps).map(x= (**x.name**,1)).groupByKey().collect ' An oversight from my side. Thanks!, Gerard. On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I can confirm this bug. The behavior for groupByKey is the same as reduceByKey - your example is actually grouping on just the name. Try this: sc.parallelize(ps).map(x= (x,1)).groupByKey().collect res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)), (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)), (P(charly),ArrayBuffer(1))) On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Just to narrow down the issue, it looks like the issue is in 'reduceByKey' and derivates like 'distinct'. groupByKey() seems to work sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)), (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1))) On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Using a case class as a key doesn't seem to work properly. [Spark 1.0.0] A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) Any ideas why this doesn't work? -kr, Gerard. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: saveAsSequenceFile for DStream
Thanks Sean! I got that working last night similar to how you solved it. Any ideas about how to monitor that same folder in another script by creating a stream? I can use sc.sequenceFile() to read in the RDD, but how do I get the name of the file that got added since there is no sequenceFileStream() method? Thanks again for your help. On Jul 22, 2014, at 1:57, Sean Owen so...@cloudera.com wrote: What about simply: dstream.foreachRDD(_.saveAsSequenceFile(...)) ? On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote: First of all, I do not know Scala, but learning. I'm doing a proof of concept by streaming content from a socket, counting the words and write it to a Tachyon disk. A different script will read the file stream and print out the results. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts) ssc.start() ssc.awaitTermination() I already did a proof of concept to write and read sequence files but there doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the best way to write out an RDD to a stream so that the timestamps are in the filenames and so there is minimal overhead in reading the data back in as objects, see below. My simple successful proof was the following: val rdd = sc.parallelize(Array((a,2), (b,3), (c,1))) rdd.saveAsSequenceFile(tachyon://.../123.sf2) val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2) How can I do something similar with streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using case classes as keys does not seem to work.
I created https://issues.apache.org/jira/browse/SPARK-2620 to track this. Maybe useful to know, this is a regression on Spark 1.0.0. I tested the same sample code on 0.9.1 and it worked (we have several jobs using case classes as key aggregators, so it better does) -kr, Gerard. On Tue, Jul 22, 2014 at 5:37 PM, Gerard Maas gerard.m...@gmail.com wrote: Yes, right. 'sc.parallelize(ps).map(x= (**x.name**,1)).groupByKey(). collect' An oversight from my side. Thanks!, Gerard. On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I can confirm this bug. The behavior for groupByKey is the same as reduceByKey - your example is actually grouping on just the name. Try this: sc.parallelize(ps).map(x= (x,1)).groupByKey().collect res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)), (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)), (P(charly),ArrayBuffer(1))) On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Just to narrow down the issue, it looks like the issue is in 'reduceByKey' and derivates like 'distinct'. groupByKey() seems to work sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)), (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1))) On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Using a case class as a key doesn't seem to work properly. [Spark 1.0.0] A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) Any ideas why this doesn't work? -kr, Gerard. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Spark Streaming source from Amazon Kinesis
i took this over from parviz. i recently submitted a new PR for Kinesis Spark Streaming support: https://github.com/apache/spark/pull/1434 others have tested it with good success, so give it a whirl! waiting for it to be reviewed/merged. please put any feedback into the PR directly. thanks! -chris On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia matei.zaha...@gmail.com wrote: No worries, looking forward to it! Matei On Apr 21, 2014, at 1:59 PM, Parviz Deyhim pdey...@gmail.com wrote: sorry Matei. Will definitely start working on making the changes soon :) On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.com wrote: There was a patch posted a few weeks ago ( https://github.com/apache/spark/pull/223), but it needs a few changes in packaging because it uses a license that isn’t fully compatible with Apache. I’d like to get this merged when the changes are made though — it would be a good input source to support. Matei On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source. Looking at the list of supported Spark Streaming sources http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick -- View this message in context: Spark Streaming source from Amazon Kinesis http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com http://nabble.com/.
Re: data locality
On standalone there is still special handling for assigning tasks within executors. There just isn't special handling for where to place executors, because standalone generally places an executor on every node. On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, I just tried the standalone cluster and didn't have chance to try Yarn yet. So if I understand correctly, there are **no** special handling of task assignment according to the HDFS block's location when Spark is running as a **standalone** cluster. Please correct me if I'm wrong. Thank you for your patience! -- *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com] *Sent:* 2014年7月22日 9:47 *To:* user@spark.apache.org *Subject:* Re: data locality This currently only works for YARN. The standalone default is to place an executor on every node for every job. The total number of executors is specified by the user. -Sandy On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, Do you mean the “preferred location” is working for standalone cluster also? Because I check the code of SparkContext and see comments as below: // This is used only by YARN for now, but should be relevant to other cluster types (*Mesos*, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from *hostname* to a list of input format splits on the host. *private*[spark] *var* preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() BTW, even with the preferred hosts, how does Spark decide how many total executors to use for this application? Thanks again! -- *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com] *Sent:* Friday, July 18, 2014 3:44 PM *To:* user@spark.apache.org *Subject:* Re: data locality Hi Haopu, Spark will ask HDFS for file block locations and try to assign tasks based on these. There is a snag. Spark schedules its tasks inside of executor processes that stick around for the lifetime of a Spark application. Spark requests executors before it runs any jobs, i.e. before it has any information about where the input data for the jobs is located. If the executors occupy significantly fewer nodes than exist in the cluster, it can be difficult for Spark to achieve data locality. The workaround for this is an API that allows passing in a set of preferred locations when instantiating a Spark context. This API is currently broken in Spark 1.0, and will likely changed to be something a little simpler in a future release. val locData = InputFormatInfo.computePreferredLocations (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) -Sandy On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Hi, I don't think anybody has been testing importing of Impala tables directly. Is there any chance to export these first, say as unpartitioned Hive tables and import these? Just an idea.. Andre On 07/21/2014 11:46 PM, chutium wrote: no, something like this 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on 02.xxx: remote Akka client disassociated ... ... 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186) 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ulimit is increased -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark sql with hive table running on Yarn-cluster mode
Hi, For running spark sql, the dataneuclus*.jar are automatically added in classpath, this works fine for spark standalone mode and yarn-client mode, however, for Yarn-cluster mode, I have to explicitly put these jars using --jars option when submitting this job, otherwise, the job will fail, why it won't work for yarn-cluster mode? Thank you for your help! Jenny
Spark app vs SparkSQL app
I could possible use Spark API and write an batch app to provide some per web page stats such as views, uniques etc. The same can be achieved using SparkSQL, so wanted to check: * what are the best practices and pros/cons of either of the approaches? * Does SparkSQL require registerAsTable for every batch or the table created persists? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-app-vs-SparkSQL-app-tp10422.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Why spark-submit command hangs?
Hi Earthson, Is your problem resolved? The way you submit your application looks alright to me; spark-submit should be able to parse the combination of --master and --deploy-mode correctly. I suspect you might have hard-coded yarn-cluster or something in your application. Andrew 2014-07-22 1:25 GMT-07:00 Earthson earthson...@gmail.com: That's what my problem is:) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: hadoop version
Hi Maria, Having files that end with 1.0.0 means you're Spark 1.0, not Hadoop 1.0. You can check your hadoop version by running $HADOOP_HOME/bin/hadoop version, where HADOOP_HOME is set to your installation of hadoop. On the clusters started by the Spark ec2 scripts, this should be /root/ephemeral-hdfs. Andrew 2014-07-22 7:07 GMT-07:00 mrm ma...@skimlinks.com: Hi, Where can I find the version of Hadoop my cluster is using? I launched my ec2 cluster using the spark-ec2 script with the --hadoop-major-version=2 option. However, the folder hadoop-native/lib in the master node only contains files that end in 1.0.0. Does that mean that I have Hadoop version 1? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming rate limiting from kafka
Hi Tobias, I tried to use 10 as numPartition. The number of executors allocated is the number of DStream. Therefore, it seems the parameter does not spread data into many partitions. In order to to that, it seems we have to do repartition. If numPartitions will distribute the data to multiple executors/partitions, then I will be able to save the running time incurred by repartition. Bill On Mon, Jul 21, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, numPartitions means the number of Spark partitions that the data received from Kafka will be split to. It has nothing to do with Kafka partitions, as far as I know. If you create multiple Kafka consumers, it doesn't seem like you can specify which consumer will consume which Kafka partitions. Instead, Kafka (at least with the interface that is exposed by the Spark Streaming API) will do something called rebalance and assign Kafka partitions to consumers evenly, you can see this in the client logs. When using multiple Kafka consumers with auto.offset.reset = true, please expect to run into this one: https://issues.apache.org/jira/browse/SPARK-2383 Tobias On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I am currentlycreating multiple DStream to consumefrom different topics. How can I let each consumer consume from different partitions. I find the following parameters from Spark API: createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc: JavaStreamingContext https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class [U], valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map[String, Integer],storageLevel: StorageLevel https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html ): JavaPairReceiverInputDStream https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html [K, V] Create an input stream that pulls messages form a Kafka Broker. The topics parameter is: *Map of (topic_name - numPartitions) to consume. Each partition is consumed in its own thread* Does numPartitions mean the total number of partitions to consume from topic_name or the index of the partition? How can we specify for each createStream which partition of the Kafka topic to consume? I think if so, I will get a lot of parallelism from the source of the data. Thanks! Bill On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Spark job tracker.
I don't understand what you're trying to do. The code will use log4j under the covers. The default configuration means writing log messages to stderr. In yarn-client mode that is your terminal screen, in yarn-cluster mode that is redirected to a file by Yarn. For the executors, that will always be redirected to a file (since they're launched by yarn). I don't know what you mean by port. But if neither of those options is what you're looking for, you need to look at providing a custom log4j configuration that does what you want. On Sun, Jul 20, 2014 at 11:05 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: Hello Marcelo Vanzin, Can you explain bit more on this? I tried using client mode but can you explain how can i use this port to write the log or output to this port?Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Marcelo
combineByKey at ShuffledDStream.scala
Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
I haven't had a chance to look at the details of this issue, but we have seen Spark successfully read Parquet tables created by Impala. On Tue, Jul 22, 2014 at 10:10 AM, Andre Schumacher andre.sc...@gmail.com wrote: Hi, I don't think anybody has been testing importing of Impala tables directly. Is there any chance to export these first, say as unpartitioned Hive tables and import these? Just an idea.. Andre On 07/21/2014 11:46 PM, chutium wrote: no, something like this 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on 02.xxx: remote Akka client disassociated ... ... 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186) 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ulimit is increased -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Need info on log4j.properties for apache spark.
Hello All, Basically i need to edit the log4j.properties to filter some of the unnecessary logs in spark on yarn-client mode. I am not sure where can i find log4j.properties file (location). Can any one help me on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Need-info-on-log4j-properties-for-apache-spark-tp10431.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark job tracker.
I fixed the error with the yarn-client mode issue which i mentioned in my earlier post. Now i want to edit the log4j.properties to filter some of the unnecessary logs. Can you let me know where can i find this properties file. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10433.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming: no job has started yet
Hi all, I am running a spark streaming job. The job hangs on one stage, which shows as follows: Details for Stage 4 Summary MetricsNo tasks have started yetTasksNo tasks have started yet Does anyone have an idea on this? Thanks! Bill Bill
What if there are large, read-only variables shared by all map functions?
Hi there, I was wondering if anybody could help me find an efficient way to make a MapReduce program like this: 1) For each map function, it need access some huge files, which is around 6GB 2) These files are READ-ONLY. Actually they are like some huge look-up table, which will not change during 2~3 years. I tried two ways to make the program work, but neither of them is efficient: 1) The first approach I tried is to let each map function load those files independently, like this: map (...) { load(files); DoMapTask(...)} 2) The second approach I tried is to load the files before RDD.map(...) and broadcast the files. However, because the files are too large, the broadcasting overhead is 30min ~ 1 hour. Could anybody help me find an efficient way to solve it? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Very wierd behavior
Is the first() being computed locally on the driver program? Maybe it's to hard to compute with the memory, etc available there. Take a look at the driver's log and see whether it has the message Computing the requested partition locally. Matei On Jul 22, 2014, at 12:04 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I was wondering if anyone could provide an explanation for the behavior I'm seeing. I have an RDD, call it foo, not too complex, with a maybe 8 level deep DAG with 2 shuffles, not empty, not even terribly big - small enough that some partitions could be empty. When I run foo.first, I get workers disconnecting, and applications die When I run foo.mapPartitions.saveAsHadoopDataset, it works fine. Anyone got an explanation for why that might be? -Thanks, Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
RE: Hive From Spark
Hi Sean, Thanks for clarifying. I re-read SPARK-2420 and now have a better understanding. From a user perspective, what would you recommend to build Spark with Hive 0.12 / 0.13+ libraries moving forward and deploy to production cluster that runs on a older version of Hadoop (e.g. 2.2 or 2.4) ? My concern is that there's going to be a lag for technology adoption and since Spark is moving fast, the libraries may always be newer. Protobuf is one good example, shading. From a biz point of view, if there is no benefit to upgrade the library, the chances that this will happen with a higher priority is low due to stability concern and re-running the entire test suite. Just by observation, there's still a lot of ppl running Hadoop 2.2 instead of 2.4 or 2.5 and the release and upgrade is depending on other big players such as Cloudera, Hortonwork, etc for their distro. Not to mention the process of upgrading. Is there any benefit to use Guava 14 in Spark? I believe there is usually some competitive reason why Spark choose Guava 14, however, I'm not sure if anyone raise that in the conversation so I don't know if that is necessary. Looking forward to seeing Hive on Spark to work soon. Please let me know if there's any help or feedback I can provide. Thanks Sean. From: so...@cloudera.com Date: Mon, 21 Jul 2014 18:36:10 +0100 Subject: Re: Hive From Spark To: user@spark.apache.org I haven't seen anyone actively 'unwilling' -- I hope not. See discussion at https://issues.apache.org/jira/browse/SPARK-2420 where I sketch what a downgrade means. I think it just hasn't gotten a looking over. Contrary to what I thought earlier, the conflict does in fact cause problems in theory, and you show it causes a problem in practice. Not to mention it causes issues for Hive-on-Spark now. On Mon, Jul 21, 2014 at 6:27 PM, Andrew Lee alee...@hotmail.com wrote: Hive and Hadoop are using an older version of guava libraries (11.0.1) where Spark Hive is using guava 14.0.1+. The community isn't willing to downgrade to 11.0.1 which is the current version for Hadoop 2.2 and Hive 0.12.
Re: Spark job tracker.
You can upload your own log4j.properties using spark-submit's --files argument. On Tue, Jul 22, 2014 at 12:45 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: I fixed the error with the yarn-client mode issue which i mentioned in my earlier post. Now i want to edit the log4j.properties to filter some of the unnecessary logs. Can you let me know where can i find this properties file. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10433.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Marcelo
RE: Tranforming flume events using Spark transformation functions
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and call are not at all executed. I might be doing this in a wrong way. Any help would be appreciated. flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { System.out.println(Inside for each...call); JavaRDDString records = eventsData.map( new FunctionSparkFlumeEvent, String() { @Override public String call(SparkFlumeEvent flume) throws Exception { String logRecord = null; AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; System.out.println(Inside Map..call); /* ListSparkFlumeEvent events = flume.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); SparkFlumeEvent flumeEvent = batchedEvents.next();*/ avroEvent = flume.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(Record is + logRecord); return logRecord; } }); return null; } -Original Message- From: Sundaram, Muthu X. [mailto:muthu.x.sundaram@sabre.com] Sent: Tuesday, July 22, 2014 10:24 AM To: user@spark.apache.org; d...@spark.incubator.apache.org Subject: Tranforming flume events using Spark transformation functions Hi All, I am getting events from flume using following line. JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, port); Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these function directly on this? I need to do following kind of operations AA YDelta TAA Southwest AA Unique tickets are , Y, , , . Count is 2, 1, T 1 and so on... AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket. I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDDString. Can I create new JavaRDDString? How do I create new JavaRDD? I loop through the events like below flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); } Where do I create new JavaRDDString? DO I do it before this loop? How do I create this JavaRDDString? In the loop I am able to get every record and I am able to print them. I appreciate any help here. Thanks, Muthu
Re: Spark job tracker.
Thanks i am able to load the file now. Can i turn off specific logs using log4j.properties. I don't want to see the below logs. How can i do this. 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Starting task 2.0:129 as TID 129 on executor 3: ** (NODE_LOCAL) 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Serialized task 2.0:129 as 14708 bytes in 0 ms *current log4j.properties entry:* # make a file appender and a console appender # Print the date in ISO 8601 format log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n log4j.appender.myFileAppender=org.apache.log4j.RollingFileAppender log4j.appender.myFileAppender.File=spark.log log4j.appender.myFileAppender.layout=org.apache.log4j.PatternLayout log4j.appender.myFileAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n # By default, everything goes to console and file log4j.rootLogger=INFO, myConsoleAppender, myFileAppender # The noisier spark logs go to file only log4j.logger.spark.storage=INFO, myFileAppender log4j.additivity.spark.storage=false log4j.logger.spark.scheduler=INFO, myFileAppender log4j.additivity.spark.scheduler=false log4j.logger.spark.CacheTracker=INFO, myFileAppender log4j.additivity.spark.CacheTracker=false log4j.logger.spark.CacheTrackerActor=INFO, myFileAppender log4j.additivity.spark.CacheTrackerActor=false log4j.logger.spark.MapOutputTrackerActor=INFO, myFileAppender log4j.additivity.spark.MapOutputTrackerActor=false log4j.logger.spark.MapOutputTracker=INFO, myFileAppender log4j.additivty.spark.MapOutputTracker=false -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10440.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Apache kafka + spark + Parquet
Now we are storing Data direct from Kafka to Parquet. We are currently using Camus and wanted to know how you went about storing to Parquet? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-kafka-spark-Parquet-tp10037p10441.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark job tracker.
The spark log classes are based on the actual class names. So if you want to filter out a package's logs you need to specify the full package name (e.g. org.apache.spark.storage instead of just spark.storage). On Tue, Jul 22, 2014 at 2:07 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: Thanks i am able to load the file now. Can i turn off specific logs using log4j.properties. I don't want to see the below logs. How can i do this. 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Starting task 2.0:129 as TID 129 on executor 3: ** (NODE_LOCAL) 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Serialized task 2.0:129 as 14708 bytes in 0 ms *current log4j.properties entry:* # make a file appender and a console appender # Print the date in ISO 8601 format log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n log4j.appender.myFileAppender=org.apache.log4j.RollingFileAppender log4j.appender.myFileAppender.File=spark.log log4j.appender.myFileAppender.layout=org.apache.log4j.PatternLayout log4j.appender.myFileAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n # By default, everything goes to console and file log4j.rootLogger=INFO, myConsoleAppender, myFileAppender # The noisier spark logs go to file only log4j.logger.spark.storage=INFO, myFileAppender log4j.additivity.spark.storage=false log4j.logger.spark.scheduler=INFO, myFileAppender log4j.additivity.spark.scheduler=false log4j.logger.spark.CacheTracker=INFO, myFileAppender log4j.additivity.spark.CacheTracker=false log4j.logger.spark.CacheTrackerActor=INFO, myFileAppender log4j.additivity.spark.CacheTrackerActor=false log4j.logger.spark.MapOutputTrackerActor=INFO, myFileAppender log4j.additivity.spark.MapOutputTrackerActor=false log4j.logger.spark.MapOutputTracker=INFO, myFileAppender log4j.additivty.spark.MapOutputTracker=false -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10440.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Marcelo
How to do an interactive Spark SQL
Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Re: How to do an interactive Spark SQL
Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Spark clustered client
Hi Folks, I have been trying to dig up some information in regards to what are the possibilities when wanting to deploy more than one client process that consumes Spark. Let's say I have a Spark Cluster of 10 servers, and would like to setup 2 additional servers which are sending requests to it through a Spark context, referencing one specific file of 1TB of data. Each client process, has its own SparkContext instance. Currently, the result is that that same file is loaded into memory twice because the Spark Context resources are not shared between processes/jvms. I wouldn't like to have that same file loaded over and over again with every new client being introduced. What would be the best practice here? Am I missing something? Thank you, Asaf
Re: How to do an interactive Spark SQL
Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Re: How to do an interactive Spark SQL
Can you paste a small code example to illustrate your questions? On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote: Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Re: the implications of some items in webUI
On Tue, Jul 22, 2014 at 7:08 AM, Yifan LI iamyifa...@gmail.com wrote: 1) what is the difference between Duration(Stages - Completed Stages) and Task Time(Executors) ? Stages are composed of tasks that run on executors. Tasks within a stage may run concurrently, since there are multiple executors and each executor may run more than one task at a time. An executor's task time is the sum of the durations of all of its tasks. Because this is a simple sum, it does not take parallelism into account: if an executor runs 8 tasks concurrently and each takes a minute, it has only spent one minute of wallclock time, but the reported task time will be 8 minutes. A stage's duration is how much wallclock time elapsed between when the first task launched and when the last task finished. This does take parallelism into account, so in the above example the stage duration would be 1 minute. 2) what are the exact meanings of Shuffle Read/Shuffle Write? Stages communicate using shuffles. Each task may start by reading shuffle inputs across the network, and may end by writing shuffle outputs to disk locally. See page 7 of the Spark NSDI paper https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf for details. Shuffle read and shuffle write refer to the total amount of data that a stage read across the network and wrote to disk. Ankur http://www.ankurdave.com/
RE: Joining by timestamp.
Thanks Chen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10449.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How could I start new spark cluster with hadoop2.0.2
Hi, I am trying to create spark cluster using spark-ec2 file under spark1.0.1 directory. 1) I noticed that It is always creating hadoop version 1.0.4.Is there a way I can override that?I would like to have hadoop2.0.2 2) I also wants install Oozie along with. Is there any scrips available along with spark-ec2, which can create oozie instances for me. Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to do an interactive Spark SQL
For example, this is what I tested and work on local mode, what it does is it get data and sql query both from kafka and do sql on each RDD and output the result back to kafka again I defined a var called *sqlS. * In the streaming part as you can see I change the sql statement if it consumes a sql message from kafka then next time when you do *sql(sqlS) *it execute the updated sql query. But this code doesn't work in cluster because sqlS is not updated on all the workers from what I understand. So my question is how do I change the sqlS value at runtime and make all the workers pick the latest value. *var sqlS = select count(*) from records* val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ import sqlContext.createSchemaRDD //val tt = Time(5000) val topicpMap = collection.immutable.HashMap(topic - numParts.toInt, sqltopic - 2) val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS = t._2;* false } else true }).map(t = getRecord(t._2.split(#))) val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer) val brokerString = ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,) KafkaSpark.props.put(metadata.broker.list, brokerString) val config = new ProducerConfig(KafkaSpark.props) val producer = new Producer[String, String](config) val result = recordsStream.foreachRDD((recRDD) = { val schemaRDD = sqlContext.createSchemaRDD(recRDD) schemaRDD.registerAsTable(tName) val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = { s + r.mkString(,) + \n }) producer.send(new KeyedMessage[String, String](outputTopic, sSQL: $sqlS \n $result)) }) ssc.start() ssc.awaitTermination() On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com wrote: Can you paste a small code example to illustrate your questions? On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote: Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Re: How to do an interactive Spark SQL
Hi, as far as I know, after the Streaming Context has started, the processing pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL statement is transformed into RDD operations when the Streaming Context starts, I think there is no way to change the statement that is executed on the current stream after the StreamingContext has started. Tobias On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com wrote: For example, this is what I tested and work on local mode, what it does is it get data and sql query both from kafka and do sql on each RDD and output the result back to kafka again I defined a var called *sqlS. * In the streaming part as you can see I change the sql statement if it consumes a sql message from kafka then next time when you do *sql(sqlS) *it execute the updated sql query. But this code doesn't work in cluster because sqlS is not updated on all the workers from what I understand. So my question is how do I change the sqlS value at runtime and make all the workers pick the latest value. *var sqlS = select count(*) from records* val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ import sqlContext.createSchemaRDD //val tt = Time(5000) val topicpMap = collection.immutable.HashMap(topic - numParts.toInt, sqltopic - 2) val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS = t._2;* false } else true }).map(t = getRecord(t._2.split(#))) val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer) val brokerString = ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,) KafkaSpark.props.put(metadata.broker.list, brokerString) val config = new ProducerConfig(KafkaSpark.props) val producer = new Producer[String, String](config) val result = recordsStream.foreachRDD((recRDD) = { val schemaRDD = sqlContext.createSchemaRDD(recRDD) schemaRDD.registerAsTable(tName) val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = { s + r.mkString(,) + \n }) producer.send(new KeyedMessage[String, String](outputTopic, sSQL: $sqlS \n $result)) }) ssc.start() ssc.awaitTermination() On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com wrote: Can you paste a small code example to illustrate your questions? On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote: Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
streaming window not behaving as advertised (v1.0.1)
I have a sample application pumping out records 1 per second. The batch interval is set to 5 seconds. Here’s a list of “observed window intervals” vs what was actually set window=25, slide=25 : observed-window=25, overlapped-batches=0 window=25, slide=20 : observed-window=20, overlapped-batches=0 window=25, slide=15 : observed-window=15, overlapped-batches=0 window=25, slide=10 : observed-window=20, overlapped-batches=2 window=25, slide=5 : observed-window=25, overlapped-batches=3 can someone explain this behavior to me? I’m trying to aggregate metrics by time batches, but want to skip partial batches. Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there. Alan
Re: How to do an interactive Spark SQL
But how do they do the interactive sql in the demo? https://www.youtube.com/watch?v=dJQ5lV5Tldw And if it can work in the local mode. I think it should be able to work in cluster mode, correct? On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, as far as I know, after the Streaming Context has started, the processing pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL statement is transformed into RDD operations when the Streaming Context starts, I think there is no way to change the statement that is executed on the current stream after the StreamingContext has started. Tobias On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com wrote: For example, this is what I tested and work on local mode, what it does is it get data and sql query both from kafka and do sql on each RDD and output the result back to kafka again I defined a var called *sqlS. * In the streaming part as you can see I change the sql statement if it consumes a sql message from kafka then next time when you do *sql(sqlS) *it execute the updated sql query. But this code doesn't work in cluster because sqlS is not updated on all the workers from what I understand. So my question is how do I change the sqlS value at runtime and make all the workers pick the latest value. *var sqlS = select count(*) from records* val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ import sqlContext.createSchemaRDD //val tt = Time(5000) val topicpMap = collection.immutable.HashMap(topic - numParts.toInt, sqltopic - 2) val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS = t._2;* false } else true }).map(t = getRecord(t._2.split(#))) val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer) val brokerString = ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,) KafkaSpark.props.put(metadata.broker.list, brokerString) val config = new ProducerConfig(KafkaSpark.props) val producer = new Producer[String, String](config) val result = recordsStream.foreachRDD((recRDD) = { val schemaRDD = sqlContext.createSchemaRDD(recRDD) schemaRDD.registerAsTable(tName) val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = { s + r.mkString(,) + \n }) producer.send(new KeyedMessage[String, String](outputTopic, sSQL: $sqlS \n $result)) }) ssc.start() ssc.awaitTermination() On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com wrote: Can you paste a small code example to illustrate your questions? On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote: Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Where is the PowerGraph abstraction
I download the spark 1.0.1, but I cannot find the PowerGraph abstraction mentioned in the GraphX paper. What I can find is the pregel abstraction. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-is-the-PowerGraph-abstraction-tp10457.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming source from Amazon Kinesis
I will take a look at it tomorrow! TD On Tue, Jul 22, 2014 at 9:30 AM, Chris Fregly ch...@fregly.com wrote: i took this over from parviz. i recently submitted a new PR for Kinesis Spark Streaming support: https://github.com/apache/spark/pull/1434 others have tested it with good success, so give it a whirl! waiting for it to be reviewed/merged. please put any feedback into the PR directly. thanks! -chris On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia matei.zaha...@gmail.com wrote: No worries, looking forward to it! Matei On Apr 21, 2014, at 1:59 PM, Parviz Deyhim pdey...@gmail.com wrote: sorry Matei. Will definitely start working on making the changes soon :) On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.com wrote: There was a patch posted a few weeks ago ( https://github.com/apache/spark/pull/223), but it needs a few changes in packaging because it uses a license that isn’t fully compatible with Apache. I’d like to get this merged when the changes are made though — it would be a good input source to support. Matei On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source. Looking at the list of supported Spark Streaming sources http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick -- View this message in context: Spark Streaming source from Amazon Kinesis http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com http://nabble.com/.
Re: combineByKey at ShuffledDStream.scala
Can you give an idea of the streaming program? Rest of the transformation you are doing on the input streams? On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
It is caused by a bug in Spark REPL. I still do not know which part of the REPL code causes it... I think people working REPL may have better idea. Regarding how I found it, based on exception, it seems we pulled in some irrelevant stuff and that import was pretty suspicious. Thanks, Yin On Tue, Jul 22, 2014 at 12:53 AM, Victor Sheng victorsheng...@gmail.com wrote: Hi, Yin Huai I test again with your snippet code. It works well in spark-1.0.1 Here is my code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class Record(data_date: String, mobile: String, create_time: String) val mobile = Record(2014-07-20,1234567,2014-07-19) val lm = List(mobile) val mobileRDD = sc.makeRDD(lm) val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD) mobileSchemaRDD.registerAsTable(mobile) sqlContext.sql(select count(1) from mobile).collect() The Result is like below: 14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.296864832 s res9: Array[org.apache.spark.sql.Row] = Array([1]) But what is the main cause of this exception? And how you find it out by looking some unknown characters like $line11.$read$ $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ? Thanks, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming window not behaving as advertised (v1.0.1)
It could be related to this bug that is currently open. https://issues.apache.org/jira/browse/SPARK-1312 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and try these combos again? TD On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote: I have a sample application pumping out records 1 per second. The batch interval is set to 5 seconds. Here’s a list of “observed window intervals” vs what was actually set window=25, slide=25 : observed-window=25, overlapped-batches=0 window=25, slide=20 : observed-window=20, overlapped-batches=0 window=25, slide=15 : observed-window=15, overlapped-batches=0 window=25, slide=10 : observed-window=20, overlapped-batches=2 window=25, slide=5 : observed-window=25, overlapped-batches=3 can someone explain this behavior to me? I’m trying to aggregate metrics by time batches, but want to skip partial batches. Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there. Alan
Re: Tranforming flume events using Spark transformation functions
This is because of the RDD's lazy evaluation! Unless you force a transformed (mapped/filtered/etc.) RDD to give you back some data (like RDD.count) or output the data (like RDD.saveAsTextFile()), Spark will not do anything. So after the eventData.map(...), if you do take(10) and then print the result, you should seem 10 items from each batch be printed. Also you can do the same map operation on the Dstream as well. FYI. inputDStream.map(...).foreachRDD(...) is equivalent to inputDStream.foreachRDD( // call rdd.map(...) ) Either way you have to call some RDD action (count, collect, take, saveAsHadoopFile, etc.) that asks the system to something concrete with the data. TD On Tue, Jul 22, 2014 at 1:55 PM, Sundaram, Muthu X. muthu.x.sundaram@sabre.com wrote: I tried to map SparkFlumeEvents to String of RDDs like below. But that map and call are not at all executed. I might be doing this in a wrong way. Any help would be appreciated. flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { System.out.println(Inside for each...call); JavaRDDString records = eventsData.map( new FunctionSparkFlumeEvent, String() { @Override public String call(SparkFlumeEvent flume) throws Exception { String logRecord = null; AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; System.out.println(Inside Map..call); /* ListSparkFlumeEvent events = flume.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); SparkFlumeEvent flumeEvent = batchedEvents.next();*/ avroEvent = flume.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(Record is + logRecord); return logRecord; } }); return null; } -Original Message- From: Sundaram, Muthu X. [mailto:muthu.x.sundaram@sabre.com] Sent: Tuesday, July 22, 2014 10:24 AM To: user@spark.apache.org; d...@spark.incubator.apache.org Subject: Tranforming flume events using Spark transformation functions Hi All, I am getting events from flume using following line. JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, port); Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these function directly on this? I need to do following kind of operations AA YDelta TAA Southwest AA Unique tickets are , Y, , , . Count is 2, 1, T 1 and so on... AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket. I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDDString. Can I create new JavaRDDString? How do I create new JavaRDD? I loop through the events like below flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); } Where do I create new JavaRDDString? DO I do it before this loop? How do I create this JavaRDDString? In the loop I am able to get every record and I am able to print them. I appreciate any help here. Thanks, Muthu
Re: Caching issue with msg: RDD block could not be dropped from memory as it does not exist
Hello Andrew, Thank you very much for your great tips. Your solution worked perfectly. In fact, I was not aware that the right option for local mode is --driver.memory 1g Cheers, Rindra On Mon, Jul 21, 2014 at 11:23 AM, Andrew Or-2 [via Apache Spark User List] ml-node+s1001560n10336...@n3.nabble.com wrote: Hi Rindra, Depending on what you're doing with your groupBy, you may end up inflating your data quite a bit. Even if your machine has 16G, by default spark-shell only uses 512M, and the amount used for storing blocks is only 60% of that (spark.storage.memoryFraction), so this space becomes ~300M. This is still many multiples of the size of your dataset, but not by orders of magnitude. If you are running Spark 1.0+, you can increase the amount of memory used by spark-shell by adding --driver-memory 1g as a command line argument in local mode, or --executor-memory 1g in any other mode. (Also, it seems that you set your log level to WARN. The cause is most probably because the cache is not big enough, but setting the log level to INFO will provide you with more information on the exact sizes that are being used by the storage and the blocks). Andrew 2014-07-19 13:01 GMT-07:00 rindra [hidden email] http://user/SendEmail.jtp?type=nodenode=10336i=0: Hi, I am working with a small dataset about 13Mbyte on the spark-shell. After doing a groupBy on the RDD, I wanted to cache RDD in memory but I keep getting these warnings: scala rdd.cache() res28: rdd.type = MappedRDD[63] at repartition at console:28 scala rdd.count() 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_82 could not be dropped from memory as it does not exist 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_82 failed 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_40 could not be dropped from memory as it does not exist 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_40 failed res29: Long = 5 It seems that I could not cache the data in memory even though my local machine has 16Gb RAM and the data is only 13MB with 100 partitions size. How to prevent this caching issue from happening? Thanks. Rindra -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10336.html To unsubscribe from Caching issue with msg: RDD block could not be dropped from memory as it does not exist, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=10248code=cmluZHJhLnViY0BnbWFpbC5jb218MTAyNDh8MTYyNTM1MTg3OQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10463.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Executor metrics in spark application
As far as I understand even if I could register the custom source, there is no way to have a cluster-wide variable to pass to it, i.e. the accumulator can be modified by tasks, but only the driver can read it and the broadcast value is constant. So it seems this custom metrics/sinks fuctionality is not really thought out by the developers. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10464.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: akka disassociated on GC
Hi Xiangrui, By using your treeAggregate and broadcast patch, the evaluation has been processed successfully. I expect that these patches are merged in the next major release (v1.1?). Without them, it would be hard to use mllib for a large dataset. Thanks, Makoto (2014/07/16 15:05), Xiangrui Meng wrote: Hi Makoto, I don't remember I wrote that but thanks for bringing this issue up! There are two important settings to check: 1) driver memory (you can see it from the executor tab), 2) number of partitions (try to use small number of partitions). I put two PRs to fix the problem: 1) use broadcast in task closure: https://github.com/apache/spark/pull/1427 2) use treeAggregate to get the result: https://github.com/apache/spark/pull/1110 They are still under review. Once merged, the problem should be fixed. I will test the KDDB dataset and report back. Thanks! Best, Xiangrui On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui yuin...@gmail.com wrote: Hello, (2014/06/19 23:43), Xiangrui Meng wrote: The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential aggregation of dense vectors on a single driver node. It took about 7.6m for aggregation for an iteration. When running the above test, I got another error at the beginning of the 2nd iteration when enabling iterations. It works fine for the first iteration but the 2nd iteration always fails. It seems that akka connections are suddenly disassociated when GC happens on the driver node. Two possible causes can be considered: 1) The driver is under a heavy load because of GC; so executors cannot connect to the driver. Changing akka timeout setting did not resolve the issue. 2) akka oddly released valid connections on GC. I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve the problem. [spark-defaults.conf] spark.akka.frameSize 50 spark.akka.timeout 120 spark.akka.askTimeout120 spark.akka.lookupTimeout 120 spark.akka.heartbeat.pauses 600 It seems this issue is related to one previously discussed in http://markmail.org/message/p2i34frtf4iusdfn Are there any preferred configurations or workaround for this issue? Thanks, Makoto [The error log of the driver] 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as 25300254 bytes in 35 ms 666.108: [GC [PSYoungGen: 6540914K-975362K(7046784K)] 12419091K-7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43, real=5.22 secs] 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc09.mydomain.org,34565) 14/07/14 18:11:38 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565) 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: app-20140714180032-0010/8 is now EXITED (Command exited with code 1) 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140714180032-0010/8 removed: Command exited with code 1 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc30.mydomain.org,59016) 14/07/14 18:11:38 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016) 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 672.596: [GC [PSYoungGen: 6642785K-359202K(6059072K)] 13459952K-8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72, real=2.83 secs] 14/07/14 18:11:41 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc03.mydomain.org,43278) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc02.mydomain.org,54538) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100) The full log is uploaded on https://dl.dropboxusercontent.com/u/13123103/driver.log [The error log of a worker] 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8 finished with state EXITED message Command exited with code 1 exitStatus 1 14/07/14 18:11:38 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to