... sorry for that, but there is a mistake in the second sample, here is the right one
// fails with either OOM or 'Container killed by YARN for exceeding memory limits ... spark.yarn.executor.memoryOverhead' rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(20,false) .count // works as expected rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(20,true) .count On Fri, Oct 12, 2018 at 7:20 PM Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > > I'd like to reduce the number of files written to hdfs without > introducing additional shuffles and at the same time to preserve the > stability of the job, and also I'd like to understand why the samples > below work in one case and fail in another one. > > Consider the following example which does the same thing using the > same resources, but fails in one case and works without issues in > another one if there is an additional shuffle introduced: > > spark-shell \ > --num-executors=5 \ > --executor-cores=2 \ > --master=yarn-client \ > --conf spark.executor.memory=4g \ > --conf spark.executor.memoryOverhead=1024 \ > --conf spark.dynamicAllocation.enabled=false > > import org.apache.hadoop.io._ > import org.apache.hadoop.io.compress._ > import org.apache.commons.lang._ > import org.apache.spark._ > > // generate 100M records of sample data > sc.makeRDD(1 to 1000, 1000) > .flatMap(item => (1 to 100000) > .map(i => new > Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new > Text(RandomStringUtils.randomAlphanumeric(1024))) > ) > .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) > val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) > > // count unique keys > rdd.keys.map(_.toString).distinct.count > // in my case it's equal to 46656 > > // fails with either OOM or 'Container killed by YARN for exceeding > memory limits ... spark.yarn.executor.memoryOverhead' > rdd > .map(item => item._1.toString -> item._2.toString) > .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) > .coalesce(20,false) > .count > > // works as expected > rdd > .map(item => item._1.toString -> item._2.toString) > .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) > .coalesce(20,false) > .count > On Wed, Oct 10, 2018 at 4:06 PM Wenchen Fan <cloud0...@gmail.com> wrote: > > > > Note that, RDD partitions and Spark tasks are not always 1-1 mapping. > > > > Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then > > `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and > > `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and > > this stage has 10 tasks (decided by the last RDD). This means, each Spark > > task will process 10 partitions of `rdd1`. > > > > Looking at your example, I don't see where is the problem. Can you describe > > what is not expected? > > > > On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com> > > wrote: > >> > >> Well, it seems that I can still extend the CoalesceRDD to make it preserve > >> the total number of partitions from the parent RDD, reduce some partitons > >> in the same way as the original coalesce does for map-only jobs and fill > >> the gaps (partitions which should reside on the positions of the coalesced > >> ones) with just a special kind of partitions which do not have any parent > >> dependencies and always return an empty iterator. > >> > >> I believe this should work as desired (at least the previous > >> ShuffleMapStage will think that the number of partitons in the next stage, > >> it generates shuffle output for, is not changed). > >> > >> There are few issues though - existence of empty partitions which can be > >> evaluated almost for free and empty output files from these empty > >> partitons which can be beaten by means of LazyOutputFormat in case of RDDs. > >> > >> > >> > >> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote: > >>> > >>> although i personally would describe this as a bug the answer will be > >>> that this is the intended behavior. the coalesce "infects" the shuffle > >>> before it, making a coalesce useless for reducing output files after a > >>> shuffle with many partitions b design. > >>> > >>> your only option left is a repartition for which you pay the price in > >>> that it introduces another expensive shuffle. > >>> > >>> interestingly if you do a coalesce on a map-only job it knows how to > >>> reduce the partitions and output files without introducing a shuffle, so > >>> clearly it is possible, but i dont know how to get this behavior after a > >>> shuffle in an existing job. > >>> > >>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com> > >>> wrote: > >>>> > >>>> Hello guys, > >>>> > >>>> Currently I'm a little bit confused with coalesce behaviour. > >>>> > >>>> Consider the following usecase - I'd like to join two pretty big RDDs. > >>>> To make a join more stable and to prevent it from failures by OOM RDDs > >>>> are usually repartitioned to redistribute data more evenly and to > >>>> prevent every partition from hitting 2GB limit. Then after join with a > >>>> lot of partitions. > >>>> > >>>> Then after successful join I'd like to save the resulting dataset. > >>>> But I don't need such a huge amount of files as the number of > >>>> partitions/tasks during joining. Actually I'm fine with such number of > >>>> files as the total number of executor cores allocated to the job. So > >>>> I've considered using a coalesce. > >>>> > >>>> The problem is that coalesce with shuffling disabled prevents join > >>>> from using the specified number of partitions and instead forces join > >>>> to use the number of partitions provided to coalesce > >>>> > >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, > >>>> false).toDebugString > >>>> res5: String = > >>>> (5) CoalescedRDD[15] at coalesce at <console>:25 [] > >>>> | MapPartitionsRDD[14] at repartition at <console>:25 [] > >>>> | CoalescedRDD[13] at repartition at <console>:25 [] > >>>> | ShuffledRDD[12] at repartition at <console>:25 [] > >>>> +-(20) MapPartitionsRDD[11] at repartition at <console>:25 [] > >>>> | ParallelCollectionRDD[10] at makeRDD at <console>:25 [] > >>>> > >>>> With shuffling enabled everything is ok, e.g. > >>>> > >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, > >>>> true).toDebugString > >>>> res6: String = > >>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 [] > >>>> | CoalescedRDD[23] at coalesce at <console>:25 [] > >>>> | ShuffledRDD[22] at coalesce at <console>:25 [] > >>>> +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 [] > >>>> | MapPartitionsRDD[20] at repartition at <console>:25 [] > >>>> | CoalescedRDD[19] at repartition at <console>:25 [] > >>>> | ShuffledRDD[18] at repartition at <console>:25 [] > >>>> +-(20) MapPartitionsRDD[17] at repartition at <console>:25 [] > >>>> | ParallelCollectionRDD[16] at makeRDD at <console>:25 [] > >>>> > >>>> In that case the problem is that for pretty huge datasets additional > >>>> reshuffling can take hours or at least comparable amount of time as > >>>> for the join itself. > >>>> > >>>> So I'd like to understand whether it is a bug or just an expected > >>>> behaviour? > >>>> In case it is expected is there any way to insert additional > >>>> ShuffleMapStage into an appropriate position of DAG but without > >>>> reshuffling itself? > >>>> > >>>> --------------------------------------------------------------------- > >>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >>>> --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org