if spark wants to compete as an alternative for mapreduce on hadoop clusters, then the assumption should not be that 99.9% of time data will fit in memory. it will not.
however that said, i am fine with a solution where one has to use DISK_ONLY for this, since that is exactly what mapreduce does too anyhow. On Mon, Nov 11, 2013 at 8:14 PM, Xia, Junluan <junluan....@intel.com> wrote: > Hi Kyle > > I totally agree with you. 'best' solution currently is to only handle > "DISK_ONLY" scenario and put iterator directly to BlockManager. > > It is so expensive for us to make code complicated for only 0.1% > possibility before we get perfect solution. > > -----Original Message----- > From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu] > Sent: Tuesday, November 12, 2013 6:28 AM > To: dev@spark.incubator.apache.org > Subject: Re: SPARK-942 > > The problem is that the iterator interface only defines 'hasNext' and > 'next' methods. So I don't think that there is really anyway to estimate > the total count until the iterator is done traversing. In my particular > case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile > stream. And one of the files just happens to be several gigabytes large. > Each of the individual elements spit out by the iterator are all the same, > just sometimes it spits out a few million more then normal. > > It's not a normal occurrence. 99.9% of the time, when people call 'flatMap' > they will probably be producing arrays that fit nicely into memory. Trying > to do a bunch of extra book keeping (ie unrolling the iterator one at a > time, trying to figure out if it's gotten too big yet), may be an extra > complication that makes the code much more complicated while only providing > a solution for extreme edge cases. > > I think the 'best' way to go would to leave the 'MEMORY_ONLY' and > 'MEMORY_AND_DISK' behaviors the same. If the user knows that their code > could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that > iterator gets passed straight to the BlockManager to be written straight to > disk. Then all we have to do is change "def put(blockId: BlockId, values: > Iterator[Any], level: StorageLevel, tellMaster: Boolean)" > (BlockManager.scala:452), to call 'diskStore.putValues' directly, rather > then unrolling the iterator and passing it onto the stardard 'doPut' like > it does now. > > Kyle > > > > > On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <junluan....@intel.com> > wrote: > > > Hi > > > > I think it is bad user experience to throw OOM exception when user > > only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK. > > > > As Kyle mentioned below, Key point is that CacheManager has unrolled > > the total Iterator into ArrayBuffer without free memory check, we > > should estimate size of unrolled iterator object and check if it is > > beyond current free memory size. > > > > We could separate into three scenarios > > > > 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception > > and need user to adjust its application 2. For MEMORY_AND_DISK, we > > should check if free memory could hold unrolled Arraybuffer, if yes, > > then it will go with usual path, if no, we will degrade it to > > DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total > > iterator into ArrayBuffer, because we could write this iterator one by > > one to disk. > > > > So this issue is how to judge if free memory size could hold size of > > unrolled iterator before it become Arraybuffer. > > > > Is there any solution for this case? Could we just unroll first 10% of > > total iterator into ArrayBuffer, and estimate this size, and total > > size is equal to 10* size of 10%? apparently it is not perfect. > > > > -----Original Message----- > > From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu] > > Sent: Thursday, November 07, 2013 2:59 AM > > To: dev@spark.incubator.apache.org > > Subject: Re: SPARK-942 > > > > I think the usage has to be calculated as the iterator is being put > > into the arraybuffer. > > Right now, the BlockManager, in it's put method when it gets an > > iterator named 'values' uses the simple stanza of: > > > > def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, > > tellMaster: Boolean) > > : Long = { > > val elements = new ArrayBuffer[Any] > > elements ++= values > > put(blockId, elements, level, tellMaster) } > > > > > > Completely unrolling the iterator in a single line. Above it, the > > CacheManager does the exact same thing with: > > > > val elements = new ArrayBuffer[Any] > > elements ++= computedValues > > blockManager.put(key, elements, storageLevel, tellMaster = true) > > > > > > We would probably have to implement some sort of 'IteratorBuffer' > > class, which would wrap an iterator. It would include a method to > > unroll an iterator into a buffer up to a point, something like > > > > def unroll(maxMem:Long) : Boolean ={ ...} > > > > And it would return True if the maxMem was hit. At which point > > BlockManager could read through the already cached values, then > > continue on through the rest of the iterators dumping all the values > > to file. If it unrolled without hitting maxMem (which would probably > > be most of the time), the class would simply wrap the ArrayBuffer of > cached values. > > > > Kyle > > > > > > > > On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <r...@apache.org> wrote: > > > > > It's not a very elegant solution, but one possibility is for the > > > CacheManager to check whether it will have enough space. If it is > > > running out of space, skips buffering the output of the iterator & > > > directly write the output of the iterator to disk (if storage level > > allows that). > > > > > > But it is still tricky to know whether we will run out of space > > > before we even start running the iterator. One possibility is to use > > > sizing data from previous partitions to estimate the size of the > > > current > > partition (i.e. > > > estimated in memory size = avg of current in-memory size / current > > > input size). > > > > > > Do you have any ideas on this one, Kyle? > > > > > > > > > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott > > > <kellr...@soe.ucsc.edu > > > >wrote: > > > > > > > I was wondering if anybody had any thoughts on the best way to > > > > tackle > > > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ). > > > > Basically, Spark takes an iterator from a flatmap call and because > > > > I tell it that it needs to persist Spark proceeds to push it all > > > > into an array before deciding that it doesn't have enough memory > > > > and trying to > > > serialize > > > > it to disk, and somewhere along the line it runs out of memory. > > > > For my particular operation, the function return an iterator that > > > > reads data out of a file, and the size of the files passed to that > > > > function can vary greatly (from a few kilobytes to a few gigabytes). > > > > The funny thing is > > > that > > > > if I do a strait 'map' operation after the flat map, everything > > > > works, because Spark just passes the iterator forward and never > > > > tries to expand the whole thing into memory. But I need do a > > > > reduceByKey across all the records, so I'd like to persist to disk > > > > first, and that is where I hit > > > this > > > > snag. > > > > I've already setup a unit test to replicate the problem, and I > > > > know the area of the code that would need to be fixed. > > > > I'm just hoping for some tips on the best way to fix the problem. > > > > > > > > Kyle > > > > > > > > > >