Re: Topic Modelling- LDA
Hi Subshri, You may find these 2 blog posts useful: https://databricks.com/blog/2015/03/25/topic-modeling-with-lda-mllib-meets-graphx.html https://databricks.com/blog/2015/09/22/large-scale-topic-modeling-improvements-to-lda-on-spark.html On Tue, Sep 22, 2015 at 11:54 PM, Subshiri Swrote: > Hi, > I am experimenting with Spark LDA. > How do I create Topic Model for Prediction in Spark ? > How do I evaluate the topics modelled in Spark ? > > Could you point some examples. > > Regards, > Subshiri > >
Re: SparkSQL concerning materials
Have you seen the Spark SQL paper?: https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, thanks for answers. I have read answers you provided, but I rather look for some materials on the internals. E.g how the optimizer works, how the query is translated into rdd operations etc. The API I am quite familiar with. A good starting point for me was: Spark DataFrames: Simple and Fast Analysis of Structured Data https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term= 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com: Or if you're a python lover then this is a good place - https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html# On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote: See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package Cheers On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com wrote: Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
Re: Data locality with HDFS not being seen
Hi Sunil, Have you seen this fix in Spark 1.5 that may fix the locality issue?: https://issues.apache.org/jira/browse/SPARK-4352 On Thu, Aug 20, 2015 at 4:09 AM, Sunil sdhe...@gmail.com wrote: Hello . I am seeing some unexpected issues with achieving HDFS data locality. I expect the tasks to be executed only on the node which has the data but this is not happening (ofcourse, unless the node is busy in which case, I understand tasks can go to some other node). Could anyone clarify whats wrong with the way I am trying or what I should rather do? Below is the cluster configuration and experiments that I have tried. Any help will be appreciated. If you would like to recreate the below scenario, then you may use the JavaWordCount.java example given within the spark. *Cluster configuration:* 1. spark-1.4.0 and hadoop-2.7.1 2. Machines -- Master node (master) and 6 worker nodes (node1 to node6) 3. master acts as -- spark master, HDFS name node sec name node, Yarn resource manager 4. Each of the 6 worker nodes act as -- spark worker node, HDFS data node, node manager *Data on HDFS:* 20Mb text file is stored in single block. With the replication factor of 3, the text file is stored on nodes 2, 3 4. *Test-1 (Spark stand alone mode):* Application being run is the standard Java word count count example with the above text file in HDFS, as input. On job submission, I see in the spark web-UI that, stage-0(i.e mapToPair) is being run on random nodes (i.e. node1, node 2, node 6, etc.). By random I mean that, stage 0 executes on the very first worker node that gets registered to the application (this can be looked from the event timeline graph). Rather, I am expecting the stage-0 to be run only on any one of the three nodes 2, 3, or 4. * Test-2 (Yarn cluster mode): * Same as above. No data locality seen. * Additional info: * No other spark applications are running and I have even tried by setting the /spark.locality.wait/ to 10s, but still no difference. Thanks and regards, Sunil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-with-HDFS-not-being-seen-tp24361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Caching and Actions
Your point #1 is a bit misleading. (1) The mappers are not executed in parallel when processing independently the same RDD. To clarify, I'd say: In one stage of execution, when pipelining occurs, mappers are not executed in parallel when processing independently the same RDD partition. On Thu, Apr 9, 2015 at 11:19 AM, spark_user_2015 li...@adobe.com wrote: That was helpful! The conclusion: (1) The mappers are not executed in parallel when processing independently the same RDD. (2) The best way seems to be (if enough memory is available and an action is applied to d1 and d2 later on) val d1 = data.map((x,y,z) = (x,y)).cache val d2 = d1.map((x,y) = (y,x)) - This avoids pipelining the d1 mapper and d2 mapper when computing d2 This is important to write efficient code, toDebugString helps a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-and-Actions-tp22418p22444.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark Executor - number of threads
Hi Igor Nirandap, There is a setting in Spark called cores or num_cores that you should look into. This # will set the # of threads running in each Executor JVM. The name of the setting is a bit misleading. You don't have to match the num_cores of the Executor to the actual number of CPU cores on the machine. You can, but you don't have to. In general, it is best to start by oversubscribing by a factor of 2x or 3x. So if you have 16 cores on the machine, set this between 32 to 48 to start with. Note that there are other internal threads in the Executor JVM used for things like shuffle. There's about 15 - 20 of them, I think. These internal threads are usually sitting idle and are only used when needed. But these are not the threads you're setting with num_cores for the Executor. The threads allocated by num_core are for the user tasks (like map or reduce) that you run for your transformations. Think of the num_cores as the # of slots (from the old MapReduce world). On Wed, Mar 18, 2015 at 12:32 AM, nirandap niranda.per...@gmail.com wrote: Hi devs, I would like to know this as well. It would be great if someone could provide this information. cheers On Tue, Mar 17, 2015 at 3:06 PM, Igor Petrov [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=22110i=0 wrote: Hello, is it possible to set number of threads in the Executor's pool? I see no such setting in the docs. The reason we want to try it: we want to see performance impact with different level of parallelism (having one thread per CPU, two threads per CPU, N threads per CPU). Thank You -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Executor-number-of-threads-tp22095.html To start a new topic under Apache Spark User List, email [hidden email] http:///user/SendEmail.jtp?type=nodenode=22110i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Niranda -- View this message in context: Re: Apache Spark Executor - number of threads http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Executor-number-of-threads-tp22095p22110.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Executor lost with too many temp files
Hi Marius, Are you using the sort or hash shuffle? Also, do you have the external shuffle service enabled (so that the Worker JVM or NodeManager can still serve the map spill files after an Executor crashes)? How many partitions are in your RDDs before and after the problematic shuffle operation? On Monday, February 23, 2015, Marius Soutier mps@gmail.com wrote: Hi guys, I keep running into a strange problem where my jobs start to fail with the dreaded Resubmitted (resubmitted due to lost executor)” because of having too many temp files from previous runs. Both /var/run and /spill have enough disk space left, but after a given amount of jobs have run, following jobs will struggle with completion. There are a lot of failures without any exception message, only the above mentioned lost executor. As soon as I clear out /var/run/spark/work/ and the spill disk, everything goes back to normal. Thanks for any hint, - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Re: Repartition and Worker Instances
In Standalone mode, a Worker JVM starts an Executor. Inside the Exec there are slots for task threads. The slot count is configured by the num_cores setting. Generally over subscribe this. So if you have 10 free CPU cores, set num_cores to 20. On Monday, February 23, 2015, Deep Pradhan pradhandeep1...@gmail.com wrote: How is task slot different from # of Workers? so don't read into any performance metrics you've collected to extrapolate what may happen at scale. I did not get you in this. Thank You On Mon, Feb 23, 2015 at 10:52 PM, Sameer Farooqui same...@databricks.com javascript:_e(%7B%7D,'cvml','same...@databricks.com'); wrote: In general you should first figure out how many task slots are in the cluster and then repartition the RDD to maybe 2x that #. So if you have a 100 slots, then maybe RDDs with partition count of 100-300 would be normal. But also size of each partition can matter. You want a task to operate on a partition for at least 200ms, but no longer than around 20 seconds. Even if you have 100 slots, it could be okay to have a RDD with 10,000 partitions if you've read in a large file. So don't repartition your RDD to match the # of Worker JVMs, but rather align it to the total # of task slots in the Executors. If you're running on a single node, shuffle operations become almost free (because there's no network movement), so don't read into any performance metrics you've collected to extrapolate what may happen at scale. On Monday, February 23, 2015, Deep Pradhan pradhandeep1...@gmail.com javascript:_e(%7B%7D,'cvml','pradhandeep1...@gmail.com'); wrote: Hi, If I repartition my data by a factor equal to the number of worker instances, will the performance be better or worse? As far as I understand, the performance should be better, but in my case it is becoming worse. I have a single node standalone cluster, is it because of this? Am I guaranteed to have a better performance if I do the same thing in a multi-node cluster? Thank You
Re: Repartition and Worker Instances
In general you should first figure out how many task slots are in the cluster and then repartition the RDD to maybe 2x that #. So if you have a 100 slots, then maybe RDDs with partition count of 100-300 would be normal. But also size of each partition can matter. You want a task to operate on a partition for at least 200ms, but no longer than around 20 seconds. Even if you have 100 slots, it could be okay to have a RDD with 10,000 partitions if you've read in a large file. So don't repartition your RDD to match the # of Worker JVMs, but rather align it to the total # of task slots in the Executors. If you're running on a single node, shuffle operations become almost free (because there's no network movement), so don't read into any performance metrics you've collected to extrapolate what may happen at scale. On Monday, February 23, 2015, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, If I repartition my data by a factor equal to the number of worker instances, will the performance be better or worse? As far as I understand, the performance should be better, but in my case it is becoming worse. I have a single node standalone cluster, is it because of this? Am I guaranteed to have a better performance if I do the same thing in a multi-node cluster? Thank You
Re: Missing shuffle files
Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: pyspark is crashing in this case. why?
Adding group back. FYI Geneis - this was on a m3.xlarge with all default settings in Spark. I used Spark version 1.3.0. The 2nd case did work for me: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/15 16:33:01 WARN TaskSetManager: Stage 1 contains a task of very large size (9766 KB). The maximum recommended task size is 100 KB. [1, 2, 3, 4, 5, 6, 7, 8, 9] On Mon, Dec 15, 2014 at 1:33 PM, Sameer Farooqui same...@databricks.com wrote: Hi Genesis, The 2nd case did work for me: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/15 16:33:01 WARN TaskSetManager: Stage 1 contains a task of very large size (9766 KB). The maximum recommended task size is 100 KB. [1, 2, 3, 4, 5, 6, 7, 8, 9] On Sun, Dec 14, 2014 at 2:13 PM, Genesis Fatum genesis.fa...@gmail.com wrote: Hi Sameer, I have tried multiple configurations. For example, executor and driver memory at 2G. Also played with the JRE memory size parameters (-Xms) and get the same error. Does it work for you? I think it is a setup issue on my side, although I have tried a couple laptops. Thanks On Sun, Dec 14, 2014 at 1:11 PM, Sameer Farooqui same...@databricks.com wrote: How much executor-memory are you setting for the JVM? What about the Driver JVM memory? Also check the Windows Event Log for Out of memory errors for one of the 2 above JVMs. On Dec 14, 2014 6:04 AM, genesis fatum genesis.fa...@gmail.com wrote: Hi, My environment is: standalone spark 1.1.1 on windows 8.1 pro. The following case works fine: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(10): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() [1, 2, 3, 4, 5, 6, 7, 8, 9] The following case does not work. The only difference is the size of the array. Note the loop range: 100K vs. 1M. a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly (crashed) java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(Unknown Source) at java.net.SocketOutputStream.write(Unknown Source) at java.io.BufferedOutputStream.flushBuffer(Unknown Source) at java.io.BufferedOutputStream.write(Unknown Source) at java.io.DataOutputStream.write(Unknown Source) at java.io.FilterOutputStream.write(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:341) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:339) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD D.scala:339) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal a:183) What I have tried: 1. Replaced JRE 32bit with JRE64 2. Multiple configurations when I start pyspark: --driver-memory, --executor-memory 3. Tried to set the SparkConf with different settings 4. Tried also with spark 1.1.0 Being new to Spark, I am sure that it is something simple that I am missing and would appreciate any thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark is crashing in this case. why?
How much executor-memory are you setting for the JVM? What about the Driver JVM memory? Also check the Windows Event Log for Out of memory errors for one of the 2 above JVMs. On Dec 14, 2014 6:04 AM, genesis fatum genesis.fa...@gmail.com wrote: Hi, My environment is: standalone spark 1.1.1 on windows 8.1 pro. The following case works fine: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(10): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() [1, 2, 3, 4, 5, 6, 7, 8, 9] The following case does not work. The only difference is the size of the array. Note the loop range: 100K vs. 1M. a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly (crashed) java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(Unknown Source) at java.net.SocketOutputStream.write(Unknown Source) at java.io.BufferedOutputStream.flushBuffer(Unknown Source) at java.io.BufferedOutputStream.write(Unknown Source) at java.io.DataOutputStream.write(Unknown Source) at java.io.FilterOutputStream.write(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:341) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:339) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD D.scala:339) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal a:183) What I have tried: 1. Replaced JRE 32bit with JRE64 2. Multiple configurations when I start pyspark: --driver-memory, --executor-memory 3. Tried to set the SparkConf with different settings 4. Tried also with spark 1.1.0 Being new to Spark, I am sure that it is something simple that I am missing and would appreciate any thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to convert an rdd to a single output file
Instead of doing this on the compute side, I would just write out the file with different blocks initially into HDFS and then use hadoop fs -getmerge or HDFSConcat to get one final output file. - SF On Fri, Dec 12, 2014 at 11:19 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) { writer.appendHeader(out, getApplication());IteratorIScoredScan scanIterator = scans.toLocalIterator();while(scanIterator.hasNext()) { IScoredScan scan = scanIterator.next();writer.appendScan(out, getApplication(), scan);}writer.appendFooter(out, getApplication());}*
Re: resource allocation spark on yarn
Hi, FYI - There are no Worker JVMs used when Spark is launched under YARN. Instead the NodeManager in YARN does what the Worker JVM does in Spark Standalone mode. For YARN you'll want to look into the following settings: --num-executors: controls how many executors will be allocated --executor-memory: RAM for each executor --executor-cores: CPU cores for each executor Also, look into the following for Dynamic Allocation: spark.dynamicAllocation.enabled spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) spark.dynamicAllocation.schedulerBacklogTimeout (M) spark.dynamicAllocation.executorIdleTimeout (K) Link to Dynamic Allocation code (with comments on how to use this feature): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala On Fri, Dec 12, 2014 at 10:52 AM, gpatcham gpatc...@gmail.com wrote: Hi All, I have spark on yarn and there are multiple spark jobs on the cluster. Sometimes some jobs are not getting enough resources even when there are enough free resources available on cluster, even when I use below settings --num-workers 75 \ --worker-cores 16 Jobs stick with the resources what they get when job started. Do we need to look at any other configs ? can some one give pointers on this issue. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/resource-allocation-spark-on-yarn-tp20664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: broadcast: OutOfMemoryError
Is the OOM happening to the Driver JVM or one of the Executor JVMs? What memory size is each JVM? How large is the data you're trying to broadcast? If it's large enough, you may want to consider just persisting the data to distributed storage (like HDFS) and read it in through the normal read RDD methods like sc.textFile(). Maybe someone else can comment on what the largest recommended data collection sizes are to use with Broadcast... On Thu, Dec 11, 2014 at 10:14 AM, ll duy.huynh@gmail.com wrote: hi. i'm running into this OutOfMemory issue when i'm broadcasting a large array. what is the best way to handle this? should i split the array into smaller arrays before broadcasting, and then combining them locally at each node? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-OutOfMemoryError-tp20633.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit on YARN is slow
Just an FYI - I can submit the SparkPi app to YARN in cluster mode on a 1-node m3.xlarge EC2 instance instance and the app finishes running successfully in about 40 seconds. I just figured the 30 - 40 sec run time was normal b/c of the submitting overhead that Andrew mentioned. Denny, you can maybe also try to run SparkPi against YARN as a speed check. spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master yarn /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/spark-examples-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar 10 On Fri, Dec 5, 2014 at 2:32 PM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: Java RDD Union
Hi Ron, Out of curiosity, why do you think that union is modifying an existing RDD in place? In general all transformations, including union, will create new RDDs, not modify old RDDs in place. Here's a quick test: scala val firstRDD = sc.parallelize(1 to 5) firstRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at console:12 scala val secondRDD = sc.parallelize(1 to 3) secondRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at console:12 scala firstRDD.collect() res1: Array[Int] = Array(1, 2, 3, 4, 5) scala secondRDD.collect() res2: Array[Int] = Array(1, 2, 3) scala val newRDD = firstRDD.union(secondRDD) newRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[4] at union at console:16 scala newRDD.collect() res3: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3) scala firstRDD.collect() res4: Array[Int] = Array(1, 2, 3, 4, 5) scala secondRDD.collect() res5: Array[Int] = Array(1, 2, 3) On Fri, Dec 5, 2014 at 2:27 PM, Ron Ayoub ronalday...@live.com wrote: I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer.
Re: Any ideas why a few tasks would stall
Good point, Ankit. Steve - You can click on the link for '27' in the first column to get a break down of how much data is in each of those 116 cached partitions. But really, you want to also understand how much data is in the 4 non-cached partitions, as they may be huge. One thing you can try doing is .repartition() on the RDD with something like 100 partitions and then cache this new RDD. See if that spreads the load between the partitions more evenly. Let us know how it goes. On Thu, Dec 4, 2014 at 12:16 AM, Ankit Soni ankitso...@gmail.com wrote: I ran into something similar before. 19/20 partitions would complete very quickly, and 1 would take the bulk of time and shuffle reads writes. This was because the majority of partitions were empty, and 1 had all the data. Perhaps something similar is going on here - I would suggest taking a look at how much data each partition contains and try to achieve a roughly even distribution for best performance. In particular, if the RDDs are PairRDDs, partitions are assigned based on the hash of the key, so an even distribution of values among keys is required for even split of data across partitions. On December 2, 2014 at 4:15:25 PM, Steve Lewis (lordjoe2...@gmail.com) wrote: 1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in Tachyon Size on Disk 2 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=2 Memory Deserialized 1x Replicated 1 100% 11.8 MB 0.0 B 0.0 B 14 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=14 Memory Deserialized 1x Replicated 1 100% 122.7 MB 0.0 B 0.0 B 7 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=7 Memory Deserialized 1x Replicated 120 100% 151.1 MB 0.0 B 0.0 B 1 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=1 Memory Deserialized 1x Replicated 1 100% 65.6 MB 0.0 B 0.0 B 10 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=10 Memory Deserialized 1x Replicated 24 100% 160.6 MB 0.0 B 0.0 B 27 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=27 Memory Deserialized 1x Replicated 116 97% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Necessity for rdd replication.
In general, most use cases don't need the RDD to be replicated in memory multiple times. It would be a rare exception to do this. If it's really expensive (time consuming) to recomputing a lost partition or if the use case is extremely time sensitive, then maybe you could replicate it in memory. But in general, you can safely rely on the RDD lineage graph to re-create the lost partition it it gets discarded from memory. As far as extracting better parallelism if the RDD is replicated, that really depends on what sort of transformations and operations you're running against the RDD, but again.. generally speaking, you shouldn't need to replicate it. On Wed, Dec 3, 2014 at 11:54 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I was just thinking about necessity for rdd replication. One category could be something like large number of threads requiring same rdd. Even though, a single rdd can be shared by multiple threads belonging to same application , I believe we can extract better parallelism if the rdd is replicated, am I right?. I am eager to know if there are any real life applications or any other scenarios which force rdd to be replicated. Can someone please throw some light on necessity for rdd replication. Thank you
Re: Monitoring Spark
Are you running Spark in Local or Standalone mode? In either mode, you should be able to hit port 4040 (to see the Spark Jobs/Stages/Storage/Executors UI) on the machine where the driver is running. However, in local mode, you won't have a Spark Master UI on 7080 or a Worker UI on 7081. You can manually set the Spark Stages UI port to something other than 4040 (in case there are conflicts) with the spark.ui.port setting. Also, after setting the evengLog.enabled to true, you may also want to specificy the spark.eventLog.dir to a globally visible filesystem like HDFS (unless you're running in local mode). On Wed, Dec 3, 2014 at 10:01 AM, Isca Harmatz pop1...@gmail.com wrote: hello, im running spark on stand alone station and im try to view the event log after the run is finished i turned on the event log as the site said (spark.eventLog.enabled set to true) but i can't find the log files or get the web ui to work. any idea on how to do this? thanks Isca
Re: Any ideas why a few tasks would stall
Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue
Re: Spark setup on local windows machine
Hi Sunita, This gitbook may also be useful for you to get Spark running in local mode on your Windows machine: http://blueplastic.gitbooks.io/how-to-light-your-spark-on-a-stick/content/ On Tue, Nov 25, 2014 at 11:09 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try following this guidelines http://docs.sigmoidanalytics.com/index.php/How_to_build_SPARK_on_Windows Thanks Best Regards On Wed, Nov 26, 2014 at 12:24 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi All, I just installed a spark on my laptop and trying to get spark-shell to work. Here is the error I see: C:\spark\binspark-shell Exception in thread main java.util.NoSuchElementException: key not found: CLAS SPATH at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm itDriverBootstrapper.scala:49) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi tDriverBootstrapper.scala) The classpath seems to be right: C:\spark\bincompute-classpath.cmd ;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar; ;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar Manually exporting the classpath to include the assembly jar doesnt help either. What could be wrong with this installation? Scala and SBT are installed, in path and are working fine. Appreciate your help. regards Sunita
Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?
Hi Shahab, Are you running Spark in Local, Standalone, YARN or Mesos mode? If you're running in Standalone/YARN/Mesos, then the .count() action is indeed automatically parallelized across multiple Executors. When you run a .count() on an RDD, it is actually distributing tasks to different executors to each do a local count on a local partition and then all the tasks send their sub-counts back to the driver for final aggregation. This sounds like the kind of behavior you're looking for. However, in Local mode, everything runs in a single JVM (the driver + executor), so there's no parallelization across Executors. On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that the count (of RDD) in many of my queries is the most time consuming one as it runs in the driver process rather then done by parallel worker nodes, Is there any way to perform count in parallel , at at least parallelize it as much as possible? best, /Shahab
Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?
By the way, in case you haven't done so, do try to .cache() the RDD before running a .count() on it as that could make a big speed improvement. On Thu, Oct 30, 2014 at 11:12 AM, Sameer Farooqui same...@databricks.com wrote: Hi Shahab, Are you running Spark in Local, Standalone, YARN or Mesos mode? If you're running in Standalone/YARN/Mesos, then the .count() action is indeed automatically parallelized across multiple Executors. When you run a .count() on an RDD, it is actually distributing tasks to different executors to each do a local count on a local partition and then all the tasks send their sub-counts back to the driver for final aggregation. This sounds like the kind of behavior you're looking for. However, in Local mode, everything runs in a single JVM (the driver + executor), so there's no parallelization across Executors. On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that the count (of RDD) in many of my queries is the most time consuming one as it runs in the driver process rather then done by parallel worker nodes, Is there any way to perform count in parallel , at at least parallelize it as much as possible? best, /Shahab
Re: SparkContext UI
Hey Stuart, The RDD won't show up under the Storage tab in the UI until it's been cached. Basically Spark doesn't know what the RDD will look like until it's cached, b/c up until then the RDD is just on disk (external to Spark). If you launch some transformations + an action on an RDD that is purely on disk, then Spark will read it from disk, compute against it and then write the results back to disk or show you the results at the scala/python shells. But when you run Spark workloads against purely on disk files, the RDD won't show up in Spark's Storage UI. Hope that makes sense... - Sameer On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Hi All, When I load an RDD with: data = sc.textFile(somefile) I don't see the resulting RDD in the SparkContext gui on localhost:4040 in /storage. Is there something special I need to do to allow me to view this? I tried but scala and python shells but same result. Thanks Stuart
Re: SparkContext UI
Hi Stuart, You're close! Just add a () after the cache, like: data.cache() ...and then run the .count() action on it and you should be good to see it in the Storage UI! - Sameer On Thu, Oct 30, 2014 at 4:50 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Sorry too quick to pull the trigger on my original email. I should have added that I'm tried using persist() and cache() but no joy. I'm doing this: data = sc.textFile(somedata) data.cache data.count() but I still can't see anything in the storage? On 31 October 2014 10:42, Sameer Farooqui same...@databricks.com wrote: Hey Stuart, The RDD won't show up under the Storage tab in the UI until it's been cached. Basically Spark doesn't know what the RDD will look like until it's cached, b/c up until then the RDD is just on disk (external to Spark). If you launch some transformations + an action on an RDD that is purely on disk, then Spark will read it from disk, compute against it and then write the results back to disk or show you the results at the scala/python shells. But when you run Spark workloads against purely on disk files, the RDD won't show up in Spark's Storage UI. Hope that makes sense... - Sameer On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Hi All, When I load an RDD with: data = sc.textFile(somefile) I don't see the resulting RDD in the SparkContext gui on localhost:4040 in /storage. Is there something special I need to do to allow me to view this? I tried but scala and python shells but same result. Thanks Stuart
Re: spark-submit memory too larger
That does seem a bit odd. How many Executors are running under this Driver? Does the spark-submit process start out using ~60GB of memory right away or does it start out smaller and slowly build up to that high? If so, how long does it take to get that high? Also, which version of Spark are you using? SameerF On Fri, Oct 24, 2014 at 8:07 AM, marylucy qaz163wsx_...@hotmail.com wrote: i used standalone spark,set spark.driver.memory=5g,but spark-submit process use 57g memory, is this normal?how to decrease it?
Re: Spark Streaming Applications
Hi Saiph, Patrick McFadin and Helena Edelson from DataStax taught a tutorial at NYC Strata last week where they created a prototype Spark Streaming + Kafka application for time series data. You can see the code here: https://github.com/killrweather/killrweather On Tue, Oct 21, 2014 at 4:33 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have been trying to find a fairly complex application that makes use of the Spark Streaming framework. I checked public github repos but the examples I found were too simple, only comprising simple operations like counters and sums. On the Spark summit website, I could find very interesting projects, however no source code was available. Where can I find non-trivial spark streaming application code? Is it that difficult? Thanks.
Re: Setting only master heap
Hi Keith, Would be helpful if you could post the error message. Are you running Spark in Standalone mode or with YARN? In general, the Spark Master is only used for scheduling and it should be fine with the default setting of 512 MB RAM. Is it actually the Spark Driver's memory that you intended to change? *++ If in Standalone mode ++* You're right that SPARK_DAEMON_MEMORY set the memory to allocate to the Spark Master, Worker and even HistoryServer daemons together. SPARK_WORKER_MEMORY is slightly confusing. In Standalone mode, it is the amount of memory that a worker advertises as available for drivers to launch executors. The sum of the memory used by executors spawned from a worker cannot exceed SPARK_WORKER_MEMORY. Unfortunately, I'm not aware of a way to set the memory for Master and Worker individually, other than launching them manually. You can also try setting the config differently on each machine's spark-env.sh file. *++ If in YARN mode ++* In YARN, there is no setting for SPARK_DAEMON_MEMORY. Therefore this is only in the Standalone documentation. Remember that in YARN mode there is no Spark Worker, instead the YARN NodeManagers launches the Executors. And in YARN, there is no need to run a Spark Master JVM (since the YARN ResourceManager takes care of the scheduling). So, with YARN use SPARK_EXECUTOR_MEMORY to set the Executor's memory. And use SPARK_DRIVER_MEMORY to set the Driver's memory. Just an FYI - for compatibility's sake, even in YARN mode there is a setting for SPARK_WORKER_MEMORY, but this has been deprecated. If you do set it, it just does the same thing as setting SPARK_EXECUTOR_MEMORY would have done. - Sameer On Wed, Oct 22, 2014 at 1:46 PM, Keith Simmons ke...@pulse.io wrote: We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Re: spark ui redirecting to port 8100
Hi Sadhan, Which port are you specifically trying to redirect? The driver program has a web UI, typically on port 4040... or the Spark Standalone Cluster Master has a UI exposed on port 7077. Which setting did you update in which file to make this change? And finally, which version of Spark are you on? Sameer F. Client Services @ Databricks On Tue, Oct 21, 2014 at 3:29 PM, sadhan sadhan.s...@gmail.com wrote: Set up the spark port to a different one and the connection seems successful but get a 302 to /proxy on port 8100 ? Nothing is listening on that port as well. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ui-redirecting-to-port-8100-tp16956.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - How to write RDD's in same directory ?
Hi Shailesh, Spark just leverages the Hadoop File Output Format to write out the RDD you are saving. This is really a Hadoop OutputFormat limitation which requires the directory it is writing into to not exist. The idea is that a Hadoop job should not be able to overwrite the results from a previous job, so it enforces that the dir should not exist. Easiest way to get around this may be to just write the results from each Spark app to a newly named directory, then on an interval run a simple script to merge data from multiple HDFS directories into one directory. This HDFS command will let you do something like a directory merge: hdfs dfs -cat /folderpath/folder* | hdfs dfs -copyFromLocal - /newfolderpath/file See this StackOverflow discussion for a way to do it using Pig and Bash scripting also: https://stackoverflow.com/questions/19979896/combine-map-reduce-output-from-different-folders-into-single-folder Sameer F. Client Services @ Databricks On Tue, Oct 21, 2014 at 3:51 PM, Shailesh Birari sbir...@wynyardgroup.com wrote: Hello, Spark 1.1.0, Hadoop 2.4.1 I have written a Spark streaming application. And I am getting FileAlreadyExistsException for rdd.saveAsTextFile(outputFolderPath). Here is brief what I am is trying to do. My application is creating text file stream using Java Stream context. The input file is on HDFS. JavaDStreamString textStream = ssc.textFileStream(InputFile); Then it is comparing each line of input stream with some data and filtering it. The filtered data I am storing in JavaDStreamString. JavaDStreamString suspectedStream= textStream.flatMap(new FlatMapFunctionString,String(){ @Override public IterableString call(String line) throws Exception { ListString filteredList = new ArrayListString(); // doing filter job return filteredList; } And this filteredList I am storing in HDFS as: suspectedStream.foreach(new FunctionJavaRDDlt;String,Void(){ @Override public Void call(JavaRDDString rdd) throws Exception { rdd.saveAsTextFile(outputFolderPath); return null; }}); But with this I am receiving org.apache.hadoop.mapred.FileAlreadyExistsException. I tried with appending random number with outputFolderPath and its working. But my requirement is to collect all output in one directory. Can you please suggest if there is any way to get rid of this exception ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-How-to-write-RDD-s-in-same-directory-tp16962.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org