The problem is that the iterator interface only defines 'hasNext' and 'next' methods. So I don't think that there is really anyway to estimate the total count until the iterator is done traversing. In my particular case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile stream. And one of the files just happens to be several gigabytes large. Each of the individual elements spit out by the iterator are all the same, just sometimes it spits out a few million more then normal.
It's not a normal occurrence. 99.9% of the time, when people call 'flatMap' they will probably be producing arrays that fit nicely into memory. Trying to do a bunch of extra book keeping (ie unrolling the iterator one at a time, trying to figure out if it's gotten too big yet), may be an extra complication that makes the code much more complicated while only providing a solution for extreme edge cases. I think the 'best' way to go would to leave the 'MEMORY_ONLY' and 'MEMORY_AND_DISK' behaviors the same. If the user knows that their code could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that iterator gets passed straight to the BlockManager to be written straight to disk. Then all we have to do is change "def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)" (BlockManager.scala:452), to call 'diskStore.putValues' directly, rather then unrolling the iterator and passing it onto the stardard 'doPut' like it does now. Kyle On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <junluan....@intel.com> wrote: > Hi > > I think it is bad user experience to throw OOM exception when user only > persist the RDD with DISK_ONLY or MEMORY_ADN_DISK. > > As Kyle mentioned below, Key point is that CacheManager has unrolled the > total Iterator into ArrayBuffer without free memory check, we should > estimate size of unrolled iterator object and check if it is beyond current > free memory size. > > We could separate into three scenarios > > 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception and > need user to adjust its application > 2. For MEMORY_AND_DISK, we should check if free memory could hold unrolled > Arraybuffer, if yes, then it will go with usual path, if no, we will > degrade it to DISK_ONLY > 3. For DIS_ONLY, I think that we need not to unroll total iterator into > ArrayBuffer, because we could write this iterator one by one to disk. > > So this issue is how to judge if free memory size could hold size of > unrolled iterator before it become Arraybuffer. > > Is there any solution for this case? Could we just unroll first 10% of > total iterator into ArrayBuffer, and estimate this size, and total size is > equal to 10* size of 10%? apparently it is not perfect. > > -----Original Message----- > From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu] > Sent: Thursday, November 07, 2013 2:59 AM > To: dev@spark.incubator.apache.org > Subject: Re: SPARK-942 > > I think the usage has to be calculated as the iterator is being put into > the arraybuffer. > Right now, the BlockManager, in it's put method when it gets an iterator > named 'values' uses the simple stanza of: > > def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, > tellMaster: Boolean) > : Long = { > val elements = new ArrayBuffer[Any] > elements ++= values > put(blockId, elements, level, tellMaster) } > > > Completely unrolling the iterator in a single line. Above it, the > CacheManager does the exact same thing with: > > val elements = new ArrayBuffer[Any] > elements ++= computedValues > blockManager.put(key, elements, storageLevel, tellMaster = true) > > > We would probably have to implement some sort of 'IteratorBuffer' class, > which would wrap an iterator. It would include a method to unroll an > iterator into a buffer up to a point, something like > > def unroll(maxMem:Long) : Boolean ={ ...} > > And it would return True if the maxMem was hit. At which point > BlockManager could read through the already cached values, then continue on > through the rest of the iterators dumping all the values to file. If it > unrolled without hitting maxMem (which would probably be most of the time), > the class would simply wrap the ArrayBuffer of cached values. > > Kyle > > > > On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <r...@apache.org> wrote: > > > It's not a very elegant solution, but one possibility is for the > > CacheManager to check whether it will have enough space. If it is > > running out of space, skips buffering the output of the iterator & > > directly write the output of the iterator to disk (if storage level > allows that). > > > > But it is still tricky to know whether we will run out of space before > > we even start running the iterator. One possibility is to use sizing > > data from previous partitions to estimate the size of the current > partition (i.e. > > estimated in memory size = avg of current in-memory size / current > > input size). > > > > Do you have any ideas on this one, Kyle? > > > > > > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <kellr...@soe.ucsc.edu > > >wrote: > > > > > I was wondering if anybody had any thoughts on the best way to > > > tackle > > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ). > > > Basically, Spark takes an iterator from a flatmap call and because I > > > tell it that it needs to persist Spark proceeds to push it all into > > > an array before deciding that it doesn't have enough memory and > > > trying to > > serialize > > > it to disk, and somewhere along the line it runs out of memory. For > > > my particular operation, the function return an iterator that reads > > > data out of a file, and the size of the files passed to that > > > function can vary greatly (from a few kilobytes to a few gigabytes). > > > The funny thing is > > that > > > if I do a strait 'map' operation after the flat map, everything > > > works, because Spark just passes the iterator forward and never > > > tries to expand the whole thing into memory. But I need do a > > > reduceByKey across all the records, so I'd like to persist to disk > > > first, and that is where I hit > > this > > > snag. > > > I've already setup a unit test to replicate the problem, and I know > > > the area of the code that would need to be fixed. > > > I'm just hoping for some tips on the best way to fix the problem. > > > > > > Kyle > > > > > >