Jesus,

That makes sense. We basically need two carts: one in front of the
horse (before we've determined parallelism), and one behind (after we
have determined parallelism).

As I said to Jacques, you could also use the "behind" cart with a
place-holder value of parallelism. But you have to be careful that you
don't use this calculation to determine parallelism.

I have just checked into
https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new
metadata provider:

interface Size {
  Double averageRowSize();
  List<Double> averageColumnSizes();
}

Then I propose to add the following providers. (Recall that a metadata
providers is a mix-in interface to RelNode; each method is evaluated
for a particular RelNode.)

interface Parallelism {
  /** Returns true if each physical operator implementing this
relational expression
    * belongs to a different process than its inputs. */
  boolean isStageLeaf();

  /** Returns the number of distinct splits of the data.
    *
    * <p>For broadcast, where each copy is the same, returns 1. */
 int splitCount();
}

interface Memory {
  /** Returns the expected amount of memory, in bytes, required by a
physical operator
   * implementing this relational expression, across all splits.
   *
   * <p>How much memory is used depends on the algorithm; for example,
an implementation
   *  Aggregate that builds a hash table requires approximately
rowCount * averageRowSize bytes,
   * whereas an implementation that assumes that the input is sorted
uses only averageRowSize. */
  Double memory();

  /** Returns the cumulative amount of memory, in bytes, required by
the physical operator
   * implementing this relational expression, and all operators within
the same stage,
   * across all splits. */
  Double cumulativeMemoryWithinStage();

  /** Returns the expected cumulative amount of memory, in bytes,
required by the physical operator
   * implementing this relational expression, and all operators within
the same stage,
   * within each split.
   *
   * <p>Basic formula:
   *   cumulativeMemoryWithinStageSplit
   *     = cumulativeMemoryWithinStage / Parallelism.splitCount */
Double cumulativeMemoryWithinStageSplit();
}

If you have not yet determined the parallelism, use
cumulativeMemoryWithinStage; if you have determined parallelism, use
cumulativeMemoryWithinStageSplit.

What do y'all think of my terminology: split, stage, stage-leaf,
process, memory. (It's not going to be the same as every engine, but
if it's at least clear.)

Julian

On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]> wrote:
> Yes, absolutely, that's the only sane way to approach this.
>
> Calcite's metadata provider model does make it possible to use an estimate of 
> parallelism at one stage of planning, then use the real number at a later 
> stage.
>
> In my next email I propose some new metadata providers. Hopefully they could 
> be fitted into Drill's planning process. Also advise whether the terminology 
> (processes, splits, stages, memory, size) seems intuitive even though not 
> tied to a particular execution engine.
>
> Julian
>
>
>> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]> wrote:
>>
>> A little food for thought given our work on Drill...
>>
>> We punted on trying to optimize all of this at once.  Very little of the
>> core plan exploration process is influenced directly by degree of
>> parallelism.  We do manage between parallelized and not and use a few
>> "hints" during planning to make some decisions where we need to.
>> Afterwards, we do a secondary pass where we determine parallelism and
>> memory consumption.  Afterwards, we will do a replan with more conservative
>> memory settings if our first plan turns out not to fit into available
>> cluster memory.  While we may lose some plans that are ideal, it makes the
>> whole process substantially easier to reason about.
>>
>> We also did this because some of our initial experimentation where we
>> included a number of these things as part of the planning process caused
>> the planning time to get out of hand.
>>
>>
>>
>>
>>
>> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]> wrote:
>>
>>> I am currently helping the Hive team compute the memory usage of
>>> operators (more precisely, a set of operators that live in the same
>>> process) using a Calcite metadata provider.
>>>
>>> Part of this task is to create an “Average tuple size” metadata
>>> provider based on a “Average column size” metadata provider. This is
>>> fairly uncontroversial. (I’ll create a jira case when
>>> https://issues.apache.org/ is back up, and we can discuss details such
>>> as how to compute average size of columns computed using built-in
>>> functions, user-defined functions, or aggregate functions.)
>>>
>>> Also we want to create a “CumulativeMemoryUseWithinProcess” metadata
>>> provider. That would start at 0 for a table scan or exchange consumer,
>>> but increase for each operator building a hash-table or using sort
>>> buffers until we reach the edge of the process.
>>>
>>> But we realized that we also need to determine the degree of
>>> parallelism, because we need to multiply AverageTupleSize not by
>>> RowCount but by RowCountWithinPartition. (If the data set has 100M
>>> rows, each 100 bytes and is bucketed 5 ways then each process will
>>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.)
>>>
>>> Now, if we are already at the stage of planning where we have already
>>> determined the degree of parallelism, then we could expose this as a
>>> ParallelismDegree metadata provider.
>>>
>>> But maybe we’re getting the cart before the horse? Maybe we should
>>> compute the degree of parallelism AFTER we have assigned operators to
>>> processes. We actually want to choose the parallelism degree such that
>>> we can fit all necessary data in memory. There might be several
>>> operators, all building hash-tables or using sort buffers. The natural
>>> break point (at least in Tez) is where the data needs to be
>>> re-partitioned, i.e. an Exchange operator.
>>>
>>> So maybe we should instead compute “CumulativeMemoryUseWithinStage”.
>>> (I'm coining the term “stage” to mean all operators within like
>>> processes, summing over all buckets.) Let’s suppose that, in the above
>>> data set, we have two operators, each of which needs to build a hash
>>> table, and we have 2GB memory available to each process.
>>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2
>>> hash-tables = 20GB. So, the parallelism degree should be 20GB / 2GB =
>>> 10.
>>>
>>> 10 is a better choice of parallelism degree than 5. We lose a little
>>> (more nodes used, and more overhead combining the 10 partitions into
>>> 1) but gain a lot (we save the cost of sending the data over the
>>> network).
>>>
>>> Thoughts on this? How do other projects determine the degree of
>>> parallelism?
>>>
>>> Julian
>>>
>

Reply via email to