+1

The providers and corresponding interfaces proposed in CALCITE-603 LGTM; I
think they cover the needs that were arising when we were moving forward
with cost-optimization/algorithm-selection in Hive's optimization phase.

Thanks,
Jesús


On 2/24/15, 7:29 PM, "Julian Hyde" <[email protected]> wrote:

>I think your "phase" concept matches my "stage". I'll use "phase" too.
>
>Agree, we should not separate the sender and receiver of an Exchange
>into separate RelNodes. I didn't mean to give that impression. Maybe I
>should call it "isPhaseTransition" rather than "isStageLeaf".
>
>Hive/Tez does not have a concept of threads (as distinct from
>processes). But I think the "split" concept will serve both Hive and
>Drill.
>
>> So maybe the question is, do you delineate these as two different
>>concepts or combine them into one (memory usage stages and
>>parallelization change phases (e.g. exchanges)).
>
>Really good question. However, I'm going to punt. I think there is
>more complexity over the horizon when we start modeling blocking
>operators, phased pipelines where phase 1 starts releasing memory as
>phase 2 starts allocating it.
>
>"isStageLeaf" allows us to model a collection of consecutive operators
>that function as a single unit for purposes of memory allocation, and
>that's a good start. (If you want to detect a change in distribution,
>look at the Distribution metadata.)
>
>On Tue, Feb 24, 2015 at 10:59 AM, Jacques Nadeau <[email protected]>
>wrote:
>> some thoughts
>>
>> - We have generic (specific) terms we use to explain these concepts:
>>phase
>> (major fragment) & slice (minor fragment or thread).
>> - It isn't clear to me why Parallelism needs to expose stageLeaf.  We
>>are
>> obviously aware of this fact but I'm not sure why it should be in the
>> framework as a specialized concept.
>>
>> Note that for planning we also don't separate out the sending and
>>receiving
>> side of an exchange because it is often useful to reason about both
>> concepts at the same time.  For example affinity mapping.
>>
>> To be clear, we mean phase (major fragment) as a unit between two
>>exchanges
>> (or leaf fragment and root fragments which are delineated by an
>>exchange).
>> Note that this is different from what we mean by stages which is a
>>separate
>> concept that describes memory transition states.  For example, you might
>> have hash join.  The join will separate the build side versus the probe
>> side as two separate stages.  Other blocking or partially blocking
>> operators may also separate stages and memory accounting needs to
>> understand both stages and phases.
>>
>> So maybe the question is, do you delineate these as two different
>>concepts
>> or combine them into one (memory usage stages and parallelization change
>> phases (e.g. exchanges)).
>>
>>
>> On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <[email protected]> wrote:
>>
>>> Jesus,
>>>
>>> That makes sense. We basically need two carts: one in front of the
>>> horse (before we've determined parallelism), and one behind (after we
>>> have determined parallelism).
>>>
>>> As I said to Jacques, you could also use the "behind" cart with a
>>> place-holder value of parallelism. But you have to be careful that you
>>> don't use this calculation to determine parallelism.
>>>
>>> I have just checked into
>>> https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new
>>> metadata provider:
>>>
>>> interface Size {
>>>   Double averageRowSize();
>>>   List<Double> averageColumnSizes();
>>> }
>>>
>>> Then I propose to add the following providers. (Recall that a metadata
>>> providers is a mix-in interface to RelNode; each method is evaluated
>>> for a particular RelNode.)
>>>
>>> interface Parallelism {
>>>   /** Returns true if each physical operator implementing this
>>> relational expression
>>>     * belongs to a different process than its inputs. */
>>>   boolean isStageLeaf();
>>>
>>>   /** Returns the number of distinct splits of the data.
>>>     *
>>>     * <p>For broadcast, where each copy is the same, returns 1. */
>>>  int splitCount();
>>> }
>>>
>>> interface Memory {
>>>   /** Returns the expected amount of memory, in bytes, required by a
>>> physical operator
>>>    * implementing this relational expression, across all splits.
>>>    *
>>>    * <p>How much memory is used depends on the algorithm; for example,
>>> an implementation
>>>    *  Aggregate that builds a hash table requires approximately
>>> rowCount * averageRowSize bytes,
>>>    * whereas an implementation that assumes that the input is sorted
>>> uses only averageRowSize. */
>>>   Double memory();
>>>
>>>   /** Returns the cumulative amount of memory, in bytes, required by
>>> the physical operator
>>>    * implementing this relational expression, and all operators within
>>> the same stage,
>>>    * across all splits. */
>>>   Double cumulativeMemoryWithinStage();
>>>
>>>   /** Returns the expected cumulative amount of memory, in bytes,
>>> required by the physical operator
>>>    * implementing this relational expression, and all operators within
>>> the same stage,
>>>    * within each split.
>>>    *
>>>    * <p>Basic formula:
>>>    *   cumulativeMemoryWithinStageSplit
>>>    *     = cumulativeMemoryWithinStage / Parallelism.splitCount */
>>> Double cumulativeMemoryWithinStageSplit();
>>> }
>>>
>>> If you have not yet determined the parallelism, use
>>> cumulativeMemoryWithinStage; if you have determined parallelism, use
>>> cumulativeMemoryWithinStageSplit.
>>>
>>> What do y'all think of my terminology: split, stage, stage-leaf,
>>> process, memory. (It's not going to be the same as every engine, but
>>> if it's at least clear.)
>>>
>>> Julian
>>>
>>> On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]>
>>> wrote:
>>> > Yes, absolutely, that's the only sane way to approach this.
>>> >
>>> > Calcite's metadata provider model does make it possible to use an
>>> estimate of parallelism at one stage of planning, then use the real
>>>number
>>> at a later stage.
>>> >
>>> > In my next email I propose some new metadata providers. Hopefully
>>>they
>>> could be fitted into Drill's planning process. Also advise whether the
>>> terminology (processes, splits, stages, memory, size) seems intuitive
>>>even
>>> though not tied to a particular execution engine.
>>> >
>>> > Julian
>>> >
>>> >
>>> >> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]>
>>> wrote:
>>> >>
>>> >> A little food for thought given our work on Drill...
>>> >>
>>> >> We punted on trying to optimize all of this at once.  Very little
>>>of the
>>> >> core plan exploration process is influenced directly by degree of
>>> >> parallelism.  We do manage between parallelized and not and use a
>>>few
>>> >> "hints" during planning to make some decisions where we need to.
>>> >> Afterwards, we do a secondary pass where we determine parallelism
>>>and
>>> >> memory consumption.  Afterwards, we will do a replan with more
>>> conservative
>>> >> memory settings if our first plan turns out not to fit into
>>>available
>>> >> cluster memory.  While we may lose some plans that are ideal, it
>>>makes
>>> the
>>> >> whole process substantially easier to reason about.
>>> >>
>>> >> We also did this because some of our initial experimentation where
>>>we
>>> >> included a number of these things as part of the planning process
>>>caused
>>> >> the planning time to get out of hand.
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]>
>>>wrote:
>>> >>
>>> >>> I am currently helping the Hive team compute the memory usage of
>>> >>> operators (more precisely, a set of operators that live in the same
>>> >>> process) using a Calcite metadata provider.
>>> >>>
>>> >>> Part of this task is to create an ³Average tuple size² metadata
>>> >>> provider based on a ³Average column size² metadata provider. This
>>>is
>>> >>> fairly uncontroversial. (I¹ll create a jira case when
>>> >>> https://issues.apache.org/ is back up, and we can discuss details
>>>such
>>> >>> as how to compute average size of columns computed using built-in
>>> >>> functions, user-defined functions, or aggregate functions.)
>>> >>>
>>> >>> Also we want to create a ³CumulativeMemoryUseWithinProcess²
>>>metadata
>>> >>> provider. That would start at 0 for a table scan or exchange
>>>consumer,
>>> >>> but increase for each operator building a hash-table or using sort
>>> >>> buffers until we reach the edge of the process.
>>> >>>
>>> >>> But we realized that we also need to determine the degree of
>>> >>> parallelism, because we need to multiply AverageTupleSize not by
>>> >>> RowCount but by RowCountWithinPartition. (If the data set has 100M
>>> >>> rows, each 100 bytes and is bucketed 5 ways then each process will
>>> >>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.)
>>> >>>
>>> >>> Now, if we are already at the stage of planning where we have
>>>already
>>> >>> determined the degree of parallelism, then we could expose this as
>>>a
>>> >>> ParallelismDegree metadata provider.
>>> >>>
>>> >>> But maybe we¹re getting the cart before the horse? Maybe we should
>>> >>> compute the degree of parallelism AFTER we have assigned operators
>>>to
>>> >>> processes. We actually want to choose the parallelism degree such
>>>that
>>> >>> we can fit all necessary data in memory. There might be several
>>> >>> operators, all building hash-tables or using sort buffers. The
>>>natural
>>> >>> break point (at least in Tez) is where the data needs to be
>>> >>> re-partitioned, i.e. an Exchange operator.
>>> >>>
>>> >>> So maybe we should instead compute
>>>³CumulativeMemoryUseWithinStage².
>>> >>> (I'm coining the term ³stage² to mean all operators within like
>>> >>> processes, summing over all buckets.) Let¹s suppose that, in the
>>>above
>>> >>> data set, we have two operators, each of which needs to build a
>>>hash
>>> >>> table, and we have 2GB memory available to each process.
>>> >>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2
>>> >>> hash-tables = 20GB. So, the parallelism degree should be 20GB /
>>>2GB =
>>> >>> 10.
>>> >>>
>>> >>> 10 is a better choice of parallelism degree than 5. We lose a
>>>little
>>> >>> (more nodes used, and more overhead combining the 10 partitions
>>>into
>>> >>> 1) but gain a lot (we save the cost of sending the data over the
>>> >>> network).
>>> >>>
>>> >>> Thoughts on this? How do other projects determine the degree of
>>> >>> parallelism?
>>> >>>
>>> >>> Julian
>>> >>>
>>> >
>>>

Reply via email to