Hello all, Apologies for the late response, this thread went below my radar. There are a number of things that can be done to improve the performance. Here are some of them of the top of my head based on what you have mentioned. Most of them are mentioned in the streaming guide's performance tuning section <http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch> .
1. Good input distribution - If the data ingestion can be distributed across multiple receivers, then it is worth doing so. Partitioning the data streams and parallelizing the receiving distributed serialization-deserialization cost of receiving. Note that you can create multiple input streams (presumably of the same type) and do a union on them (StreamingContext.union) to create a single DStream out of them. 2. Good task distribution - The tasks, specially the first map stage on the input stream(s), should be well distributed across the workers in the cluster. If they are not, then it is sometimes worth repartitioning the dstream (DStream.repartition) so that computation can be parallelized well. Can improve performance. 3. Enable Kryo serialization - Set spark conf property "spark.serializer" [see configuration guide <http://spark.apache.org/docs/latest/configuration.html>] This can improve performance all across the board. 4. Enable Concurrent MarkSweep GC - This helps with more stable batch processing times. You see the gc overheads in stages in the web ui. If they are in the order of 100s milliseconds, then its worth tuning the GC. 5. Store strings as byte arrays, and do all string transformations as byte array transformations (that is, implemented extractWordOnePairs on bytearrays). This can give a very significant boost is throughput as the java string library is not very efficient. Granted that this requires more work in manipulating bytes yourself. Regarding the 5 seconds pauses, the culprit could very well be the disk. Since shuffle file are written to disk, that often gives rise to unpredictable delays. In our research paper <http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf> we had got much higher performance using in-memory shuffling. However, the current version of Spark does not have in-memory shuffling for a number of reasons (prevent RDD cache pollution with single-use shuffle data, reduce chances of OOMs, etc.). It is recommended that you use a fast file system (SSD?, RAMFS?) at the workers and make sure the local working directory uses that. If you are interested in hacking in-memory shuffling within Spark, you can hack around the org.apache.spark.storage.* (especially, BlockManager) to make sure all shuffle related "blocks" go to MemoryStore (look out for code deals with shuffle blocks and passes them directly to DiskBlockManager). We are in the process of building in-memory shuffle in spark, but its hard to give a ETA on that as of now. To achieve even higher throughput (specifically the ones reported in the paper), it require pre-serializing the data into the Spark format (kryo format, if kryo serialization is used) as large byte arrays, receiving the byte arrays directly and storing them in the receiver directly (without deserializing them). This avoids serialization-deserialization costs of receiving the data and can give higher throughput overall. Let me know if this helps, or there are more detailed performance tuning you are interested in. TD On Fri, Jun 13, 2014 at 11:22 AM, Michael Chang <m...@tellapart.com> wrote: > I'm interested in this issue as well. I have spark streaming jobs that > seems to run well for a while, but slowly degrade and don't recover. > > > On Wed, Jun 11, 2014 at 11:08 PM, Boduo Li <onpo...@gmail.com> wrote: > >> It seems that the slow "reduce" tasks are caused by slow shuffling. Here >> is >> the logs regarding one slow "reduce" task: >> >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_88_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_89_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_90_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_91_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_92_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_93_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_94_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_95_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_96_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_97_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_188_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_189_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_190_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_191_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_192_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_193_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_194_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_195_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_196_18 after 5029 ms >> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got >> remote block shuffle_69_197_18 after 5029 ms >> 14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is >> 1143 >> 14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to >> driver >> 14/06/11 23:42:45 INFO Executor: Finished task ID 23643 >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >