if spark wants to compete as an alternative for mapreduce on hadoop
clusters, then the assumption should not be that 99.9% of time data will
fit in memory. it will not.

however that said, i am fine with a solution where one has to use DISK_ONLY
for this, since that is exactly what mapreduce does too anyhow.


On Mon, Nov 11, 2013 at 8:14 PM, Xia, Junluan <junluan....@intel.com> wrote:

> Hi Kyle
>
> I totally agree with you. 'best' solution currently is to only handle
> "DISK_ONLY" scenario and put iterator directly to BlockManager.
>
> It is so expensive for us to make code complicated for only 0.1%
> possibility before we get perfect solution.
>
> -----Original Message-----
> From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> Sent: Tuesday, November 12, 2013 6:28 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> The problem is that the iterator interface only defines 'hasNext' and
> 'next' methods. So I don't think that there is really anyway to estimate
> the total count until the iterator is done traversing. In my particular
> case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile
> stream. And one of the files just happens to be several gigabytes large.
> Each of the individual elements spit out by the iterator are all the same,
> just sometimes it spits out a few million more then normal.
>
> It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
> they will probably be producing arrays that fit nicely into memory. Trying
> to do a bunch of extra book keeping (ie unrolling the iterator one at a
> time, trying to figure out if it's gotten too big yet), may be an extra
> complication that makes the code much more complicated while only providing
> a solution for extreme edge cases.
>
> I think the 'best' way to go would to leave the 'MEMORY_ONLY' and
> 'MEMORY_AND_DISK' behaviors the same. If the user knows that their code
> could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that
> iterator gets passed straight to the BlockManager to be written straight to
> disk. Then all we have to do is change "def put(blockId: BlockId, values:
> Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
> (BlockManager.scala:452), to call 'diskStore.putValues' directly, rather
> then unrolling the iterator and passing it onto the stardard 'doPut' like
> it does now.
>
> Kyle
>
>
>
>
> On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <junluan....@intel.com>
> wrote:
>
> > Hi
> >
> > I think it is bad user experience to throw OOM exception when user
> > only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
> >
> > As Kyle mentioned below, Key point is that CacheManager has unrolled
> > the total Iterator into ArrayBuffer without free memory check, we
> > should estimate size of unrolled iterator object and check if it is
> > beyond current free memory size.
> >
> > We could separate into three scenarios
> >
> > 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception
> > and need user to adjust its application 2. For MEMORY_AND_DISK, we
> > should check if free memory could hold unrolled Arraybuffer, if yes,
> > then it will go with usual path, if no, we will degrade it to
> > DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total
> > iterator into ArrayBuffer, because we could write this iterator one by
> > one to disk.
> >
> > So this issue is how to judge if free memory size could hold size of
> > unrolled iterator before it become Arraybuffer.
> >
> > Is there any solution for this case? Could we just unroll first 10% of
> > total iterator into ArrayBuffer, and estimate this size, and total
> > size is equal to 10* size of 10%? apparently it is not perfect.
> >
> > -----Original Message-----
> > From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> > Sent: Thursday, November 07, 2013 2:59 AM
> > To: dev@spark.incubator.apache.org
> > Subject: Re: SPARK-942
> >
> > I think the usage has to be calculated as the iterator is being put
> > into the arraybuffer.
> > Right now, the BlockManager, in it's put method when it gets an
> > iterator named 'values' uses the simple stanza of:
> >
> > def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> > tellMaster: Boolean)
> >     : Long = {
> >     val elements = new ArrayBuffer[Any]
> >     elements ++= values
> >     put(blockId, elements, level, tellMaster) }
> >
> >
> > Completely unrolling the iterator in a single line.  Above it, the
> > CacheManager does the exact same thing with:
> >
> > val elements = new ArrayBuffer[Any]
> > elements ++= computedValues
> > blockManager.put(key, elements, storageLevel, tellMaster = true)
> >
> >
> > We would probably have to implement some sort of 'IteratorBuffer'
> > class, which would wrap an iterator. It would include a method to
> > unroll an iterator into a buffer up to a point, something like
> >
> > def unroll(maxMem:Long) : Boolean ={ ...}
> >
> > And it would return True if the maxMem was hit. At which point
> > BlockManager could read through the already cached values, then
> > continue on through the rest of the iterators dumping all the values
> > to file. If it unrolled without hitting maxMem (which would probably
> > be most of the time), the class would simply wrap the ArrayBuffer of
> cached values.
> >
> > Kyle
> >
> >
> >
> > On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <r...@apache.org> wrote:
> >
> > > It's not a very elegant solution, but one possibility is for the
> > > CacheManager to check whether it will have enough space. If it is
> > > running out of space, skips buffering the output of the iterator &
> > > directly write the output of the iterator to disk (if storage level
> > allows that).
> > >
> > > But it is still tricky to know whether we will run out of space
> > > before we even start running the iterator. One possibility is to use
> > > sizing data from previous partitions to estimate the size of the
> > > current
> > partition (i.e.
> > > estimated in memory size = avg of current in-memory size / current
> > > input size).
> > >
> > > Do you have any ideas on this one, Kyle?
> > >
> > >
> > > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott
> > > <kellr...@soe.ucsc.edu
> > > >wrote:
> > >
> > > > I was wondering if anybody had any thoughts on the best way to
> > > > tackle
> > > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > > > Basically, Spark takes an iterator from a flatmap call and because
> > > > I tell it that it needs to persist Spark proceeds to push it all
> > > > into an array before deciding that it doesn't have enough memory
> > > > and trying to
> > > serialize
> > > > it to disk, and somewhere along the line it runs out of memory.
> > > > For my particular operation, the function return an iterator that
> > > > reads data out of a file, and the size of the files passed to that
> > > > function can vary greatly (from a few kilobytes to a few gigabytes).
> > > > The funny thing is
> > > that
> > > > if I do a strait 'map' operation after the flat map, everything
> > > > works, because Spark just passes the iterator forward and never
> > > > tries to expand the whole thing into memory. But I need do a
> > > > reduceByKey across all the records, so I'd like to persist to disk
> > > > first, and that is where I hit
> > > this
> > > > snag.
> > > > I've already setup a unit test to replicate the problem, and I
> > > > know the area of the code that would need to be fixed.
> > > > I'm just hoping for some tips on the best way to fix the problem.
> > > >
> > > > Kyle
> > > >
> > >
> >
>

Reply via email to