Thanks!  Sounds like my rough understanding was roughly right :)

Definitely understand cached RDDs can add to the memory requirements.
 Luckily, like you mentioned, you can configure spark to flush that to disk
and bound its total size in memory via spark.storage.memoryFraction, so I
have a pretty good handle on the overall RDD contribution.

Thanks for all the help.

Keith


On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen <c...@adatao.com> wrote:

> Keith, please see inline.
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Tue, May 27, 2014 at 7:22 PM, Keith Simmons <ke...@pulse.io> wrote:
>
>> A dash of both.  I want to know enough that I can "reason about", rather
>> than "strictly control", the amount of memory Spark will use.  If I have a
>> big data set, I want to understand how I can design it so that Spark's
>> memory consumption falls below my available resources.  Or alternatively,
>> if it's even possible for Spark to process a data set over a certain size.
>>  And if I run into memory problems, I want to know which knobs to turn, and
>> how turning those knobs will affect memory consumption.
>>
>
> In practice, to avoid OOME, a key dial we use is the size (or inversely,
> number) of the partitions of your dataset. Clearly there is some "blow-up
> factor" F such that, e.g., if you start out with 128MB on-disk data
> partitions, you would consume 128F MB of memory, both by Spark and by your
> closure code. Knowing this, you would want to size the partitions such that
> AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker > 128F. To arrive at
> F, you could do some back-of-the-envelope modeling, and/or run the job and
> observe empirically.
>
>
>>
>> It's my understanding that between certain key stages in a Spark DAG
>> (i.e. group by stages), Spark will serialize all data structures necessary
>> to continue the computation at the next stage, including closures.  So in
>> theory, per machine, Spark only needs to hold the transient memory required
>> to process the partitions assigned to the currently active tasks.  Is my
>> understanding correct?  Specifically, once a key/value pair is serialized
>> in the shuffle stage of a task, are the references to the raw java objects
>> released before the next task is started.
>>
>
> Yes, that is correct in non-cached mode. At the same time, Spark also does
> something else optionally, which is to keep the data structures (RDDs)
> persistent in memory (*). As such it is possible partitions that are not
> being actively worked on to be consuming memory. Spark will spill all these
> to local disk if they take up more memory than it is allowed to take. So
> the key thing to worry about is less about what Spark does (apart of
> overhead and yes, the possibility of bugs that need to be fixed), and more
> about what your closure code does with JVM memory as a whole. If in doubt,
> refer back to the "blow-up factor" model described above.
>
> (*) this is a fundamentally differentiating feature of Spark over a range
> of other "in-memory" architectures, that focus on raw-data or transient
> caches that serve non-equivalent purposes when viewed from the application
> level. It allows for very fast access to ready-to-consume high-level data
> structures, as long as available RAM permits.
>
>
>>
>>
>> On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen <c...@adatao.com>wrote:
>>
>>> Keith, do you mean "bound" as in (a) strictly control to some
>>> quantifiable limit, or (b) try to minimize the amount used by each task?
>>>
>>> If "a", then that is outside the scope of Spark's memory management,
>>> which you should think of as an application-level (that is, above JVM)
>>> mechanism. In this scope, Spark "voluntarily" tracks and limits the amount
>>> of memory it uses for explicitly known data structures, such as RDDs. What
>>> Spark cannot do is, e.g., control or manage the amount of JVM memory that a
>>> given piece of user code might take up. For example, I might write some
>>> closure code that allocates a large array of doubles unbeknownst to Spark.
>>>
>>> If "b", then your thinking is in the right direction, although quite
>>> imperfect, because of things like the example above. We often experience
>>> OOME if we're not careful with job partitioning. What I think Spark needs
>>> to evolve to is at least to include a mechanism for application-level hints
>>> about task memory requirements. We might work on this and submit a PR for
>>> it.
>>>
>>> --
>>> Christopher T. Nguyen
>>> Co-founder & CEO, Adatao <http://adatao.com>
>>> linkedin.com/in/ctnguyen
>>>
>>>
>>>
>>> On Tue, May 27, 2014 at 5:33 PM, Keith Simmons <ke...@pulse.io> wrote:
>>>
>>>> I'm trying to determine how to bound my memory use in a job working
>>>> with more data than can simultaneously fit in RAM.  From reading the tuning
>>>> guide, my impression is that Spark's memory usage is roughly the following:
>>>>
>>>> (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient
>>>> memory used by all currently running tasks
>>>>
>>>> I can bound A with spark.storage.memoryFraction and I can bound B with 
>>>> spark.shuffle.memoryFraction.
>>>>  I'm wondering how to bound C.
>>>>
>>>> It's been hinted at a few times on this mailing list that you can
>>>> reduce memory use by increasing the number of partitions.  That leads me to
>>>> believe that the amount of transient memory is roughly follows:
>>>>
>>>> total_data_set_size/number_of_partitions *
>>>> number_of_tasks_simultaneously_running_per_machine
>>>>
>>>> Does this sound right?  In other words, as I increase the number of
>>>> partitions, the size of each partition will decrease, and since each task
>>>> is processing a single partition and there are a bounded number of tasks in
>>>> flight, my memory use has a rough upper limit.
>>>>
>>>> Keith
>>>>
>>>
>>>
>>
>

Reply via email to