... sorry for that, but there is a mistake in the second sample, here
is the right one

// fails with either OOM or 'Container killed by YARN for exceeding
memory limits ... spark.yarn.executor.memoryOverhead'
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

// works as expected
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,true)
  .count
On Fri, Oct 12, 2018 at 7:20 PM Sergey Zhemzhitsky <szh.s...@gmail.com> wrote:
>
> I'd like to reduce the number of files written to hdfs without
> introducing additional shuffles and at the same time to preserve the
> stability of the job, and also I'd like to understand why the samples
> below work in one case and fail in another one.
>
> Consider the following example which does the same thing using the
> same resources, but fails in one case and works without issues in
> another one if there is an additional shuffle introduced:
>
> spark-shell \
>   --num-executors=5 \
>   --executor-cores=2 \
>   --master=yarn-client \
>   --conf spark.executor.memory=4g \
>   --conf spark.executor.memoryOverhead=1024 \
>   --conf spark.dynamicAllocation.enabled=false
>
> import org.apache.hadoop.io._
> import org.apache.hadoop.io.compress._
> import org.apache.commons.lang._
> import org.apache.spark._
>
> // generate 100M records of sample data
> sc.makeRDD(1 to 1000, 1000)
>   .flatMap(item => (1 to 100000)
>     .map(i => new
> Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new
> Text(RandomStringUtils.randomAlphanumeric(1024)))
>   )
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
>
> // count unique keys
> rdd.keys.map(_.toString).distinct.count
> // in my case it's equal to 46656
>
> // fails with either OOM or 'Container killed by YARN for exceeding
> memory limits ... spark.yarn.executor.memoryOverhead'
> rdd
>   .map(item => item._1.toString -> item._2.toString)
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
>   .coalesce(20,false)
>   .count
>
> // works as expected
> rdd
>   .map(item => item._1.toString -> item._2.toString)
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
>   .coalesce(20,false)
>   .count
> On Wed, Oct 10, 2018 at 4:06 PM Wenchen Fan <cloud0...@gmail.com> wrote:
> >
> > Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
> >
> > Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then 
> > `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and 
> > `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and 
> > this stage has 10 tasks (decided by the last RDD). This means, each Spark 
> > task will process 10 partitions of `rdd1`.
> >
> > Looking at your example, I don't see where is the problem. Can you describe 
> > what is not expected?
> >
> > On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com> 
> > wrote:
> >>
> >> Well, it seems that I can still extend the CoalesceRDD to make it preserve 
> >> the total number of partitions from the parent RDD, reduce some partitons 
> >> in the same way as the original coalesce does for map-only jobs and fill 
> >> the gaps (partitions which should reside on the positions of the coalesced 
> >> ones) with just a special kind of partitions which do not have any parent 
> >> dependencies and always return an empty iterator.
> >>
> >> I believe this should work as desired (at least the previous 
> >> ShuffleMapStage will think that the number of partitons in the next stage, 
> >> it generates shuffle output for, is not changed).
> >>
> >> There are few issues though - existence of empty partitions which can be 
> >> evaluated almost for free and empty output files from these empty 
> >> partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
> >>
> >>
> >>
> >> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote:
> >>>
> >>> although i personally would describe this as a bug the answer will be 
> >>> that this is the intended behavior. the coalesce "infects" the shuffle 
> >>> before it, making a coalesce useless for reducing output files after a 
> >>> shuffle with many partitions b design.
> >>>
> >>> your only option left is a repartition for which you pay the price in 
> >>> that it introduces another expensive shuffle.
> >>>
> >>> interestingly if you do a coalesce on a map-only job it knows how to 
> >>> reduce the partitions and output files without introducing a shuffle, so 
> >>> clearly it is possible, but i dont know how to get this behavior after a 
> >>> shuffle in an existing job.
> >>>
> >>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com> 
> >>> wrote:
> >>>>
> >>>> Hello guys,
> >>>>
> >>>> Currently I'm a little bit confused with coalesce behaviour.
> >>>>
> >>>> Consider the following usecase - I'd like to join two pretty big RDDs.
> >>>> To make a join more stable and to prevent it from failures by OOM RDDs
> >>>> are usually repartitioned to redistribute data more evenly and to
> >>>> prevent every partition from hitting 2GB limit. Then after join with a
> >>>> lot of partitions.
> >>>>
> >>>> Then after successful join I'd like to save the resulting dataset.
> >>>> But I don't need such a huge amount of files as the number of
> >>>> partitions/tasks during joining. Actually I'm fine with such number of
> >>>> files as the total number of executor cores allocated to the job. So
> >>>> I've considered using a coalesce.
> >>>>
> >>>> The problem is that coalesce with shuffling disabled prevents join
> >>>> from using the specified number of partitions and instead forces join
> >>>> to use the number of partitions provided to coalesce
> >>>>
> >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
> >>>> false).toDebugString
> >>>> res5: String =
> >>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
> >>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
> >>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
> >>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
> >>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
> >>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
> >>>>
> >>>> With shuffling enabled everything is ok, e.g.
> >>>>
> >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, 
> >>>> true).toDebugString
> >>>> res6: String =
> >>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
> >>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
> >>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
> >>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
> >>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
> >>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
> >>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
> >>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
> >>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
> >>>>
> >>>> In that case the problem is that for pretty huge datasets additional
> >>>> reshuffling can take hours or at least comparable amount of time as
> >>>> for the join itself.
> >>>>
> >>>> So I'd like to understand whether it is a bug or just an expected 
> >>>> behaviour?
> >>>> In case it is expected is there any way to insert additional
> >>>> ShuffleMapStage into an appropriate position of DAG but without
> >>>> reshuffling itself?
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>>>

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to