Jesus, That makes sense. We basically need two carts: one in front of the horse (before we've determined parallelism), and one behind (after we have determined parallelism).
As I said to Jacques, you could also use the "behind" cart with a place-holder value of parallelism. But you have to be careful that you don't use this calculation to determine parallelism. I have just checked into https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new metadata provider: interface Size { Double averageRowSize(); List<Double> averageColumnSizes(); } Then I propose to add the following providers. (Recall that a metadata providers is a mix-in interface to RelNode; each method is evaluated for a particular RelNode.) interface Parallelism { /** Returns true if each physical operator implementing this relational expression * belongs to a different process than its inputs. */ boolean isStageLeaf(); /** Returns the number of distinct splits of the data. * * <p>For broadcast, where each copy is the same, returns 1. */ int splitCount(); } interface Memory { /** Returns the expected amount of memory, in bytes, required by a physical operator * implementing this relational expression, across all splits. * * <p>How much memory is used depends on the algorithm; for example, an implementation * Aggregate that builds a hash table requires approximately rowCount * averageRowSize bytes, * whereas an implementation that assumes that the input is sorted uses only averageRowSize. */ Double memory(); /** Returns the cumulative amount of memory, in bytes, required by the physical operator * implementing this relational expression, and all operators within the same stage, * across all splits. */ Double cumulativeMemoryWithinStage(); /** Returns the expected cumulative amount of memory, in bytes, required by the physical operator * implementing this relational expression, and all operators within the same stage, * within each split. * * <p>Basic formula: * cumulativeMemoryWithinStageSplit * = cumulativeMemoryWithinStage / Parallelism.splitCount */ Double cumulativeMemoryWithinStageSplit(); } If you have not yet determined the parallelism, use cumulativeMemoryWithinStage; if you have determined parallelism, use cumulativeMemoryWithinStageSplit. What do y'all think of my terminology: split, stage, stage-leaf, process, memory. (It's not going to be the same as every engine, but if it's at least clear.) Julian On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]> wrote: > Yes, absolutely, that's the only sane way to approach this. > > Calcite's metadata provider model does make it possible to use an estimate of > parallelism at one stage of planning, then use the real number at a later > stage. > > In my next email I propose some new metadata providers. Hopefully they could > be fitted into Drill's planning process. Also advise whether the terminology > (processes, splits, stages, memory, size) seems intuitive even though not > tied to a particular execution engine. > > Julian > > >> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]> wrote: >> >> A little food for thought given our work on Drill... >> >> We punted on trying to optimize all of this at once. Very little of the >> core plan exploration process is influenced directly by degree of >> parallelism. We do manage between parallelized and not and use a few >> "hints" during planning to make some decisions where we need to. >> Afterwards, we do a secondary pass where we determine parallelism and >> memory consumption. Afterwards, we will do a replan with more conservative >> memory settings if our first plan turns out not to fit into available >> cluster memory. While we may lose some plans that are ideal, it makes the >> whole process substantially easier to reason about. >> >> We also did this because some of our initial experimentation where we >> included a number of these things as part of the planning process caused >> the planning time to get out of hand. >> >> >> >> >> >> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]> wrote: >> >>> I am currently helping the Hive team compute the memory usage of >>> operators (more precisely, a set of operators that live in the same >>> process) using a Calcite metadata provider. >>> >>> Part of this task is to create an “Average tuple size” metadata >>> provider based on a “Average column size” metadata provider. This is >>> fairly uncontroversial. (I’ll create a jira case when >>> https://issues.apache.org/ is back up, and we can discuss details such >>> as how to compute average size of columns computed using built-in >>> functions, user-defined functions, or aggregate functions.) >>> >>> Also we want to create a “CumulativeMemoryUseWithinProcess” metadata >>> provider. That would start at 0 for a table scan or exchange consumer, >>> but increase for each operator building a hash-table or using sort >>> buffers until we reach the edge of the process. >>> >>> But we realized that we also need to determine the degree of >>> parallelism, because we need to multiply AverageTupleSize not by >>> RowCount but by RowCountWithinPartition. (If the data set has 100M >>> rows, each 100 bytes and is bucketed 5 ways then each process will >>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.) >>> >>> Now, if we are already at the stage of planning where we have already >>> determined the degree of parallelism, then we could expose this as a >>> ParallelismDegree metadata provider. >>> >>> But maybe we’re getting the cart before the horse? Maybe we should >>> compute the degree of parallelism AFTER we have assigned operators to >>> processes. We actually want to choose the parallelism degree such that >>> we can fit all necessary data in memory. There might be several >>> operators, all building hash-tables or using sort buffers. The natural >>> break point (at least in Tez) is where the data needs to be >>> re-partitioned, i.e. an Exchange operator. >>> >>> So maybe we should instead compute “CumulativeMemoryUseWithinStage”. >>> (I'm coining the term “stage” to mean all operators within like >>> processes, summing over all buckets.) Let’s suppose that, in the above >>> data set, we have two operators, each of which needs to build a hash >>> table, and we have 2GB memory available to each process. >>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2 >>> hash-tables = 20GB. So, the parallelism degree should be 20GB / 2GB = >>> 10. >>> >>> 10 is a better choice of parallelism degree than 5. We lose a little >>> (more nodes used, and more overhead combining the 10 partitions into >>> 1) but gain a lot (we save the cost of sending the data over the >>> network). >>> >>> Thoughts on this? How do other projects determine the degree of >>> parallelism? >>> >>> Julian >>> >
