I think your "phase" concept matches my "stage". I'll use "phase" too.
Agree, we should not separate the sender and receiver of an Exchange into separate RelNodes. I didn't mean to give that impression. Maybe I should call it "isPhaseTransition" rather than "isStageLeaf". Hive/Tez does not have a concept of threads (as distinct from processes). But I think the "split" concept will serve both Hive and Drill. > So maybe the question is, do you delineate these as two different concepts or > combine them into one (memory usage stages and parallelization change phases > (e.g. exchanges)). Really good question. However, I'm going to punt. I think there is more complexity over the horizon when we start modeling blocking operators, phased pipelines where phase 1 starts releasing memory as phase 2 starts allocating it. "isStageLeaf" allows us to model a collection of consecutive operators that function as a single unit for purposes of memory allocation, and that's a good start. (If you want to detect a change in distribution, look at the Distribution metadata.) On Tue, Feb 24, 2015 at 10:59 AM, Jacques Nadeau <[email protected]> wrote: > some thoughts > > - We have generic (specific) terms we use to explain these concepts: phase > (major fragment) & slice (minor fragment or thread). > - It isn't clear to me why Parallelism needs to expose stageLeaf. We are > obviously aware of this fact but I'm not sure why it should be in the > framework as a specialized concept. > > Note that for planning we also don't separate out the sending and receiving > side of an exchange because it is often useful to reason about both > concepts at the same time. For example affinity mapping. > > To be clear, we mean phase (major fragment) as a unit between two exchanges > (or leaf fragment and root fragments which are delineated by an exchange). > Note that this is different from what we mean by stages which is a separate > concept that describes memory transition states. For example, you might > have hash join. The join will separate the build side versus the probe > side as two separate stages. Other blocking or partially blocking > operators may also separate stages and memory accounting needs to > understand both stages and phases. > > So maybe the question is, do you delineate these as two different concepts > or combine them into one (memory usage stages and parallelization change > phases (e.g. exchanges)). > > > On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <[email protected]> wrote: > >> Jesus, >> >> That makes sense. We basically need two carts: one in front of the >> horse (before we've determined parallelism), and one behind (after we >> have determined parallelism). >> >> As I said to Jacques, you could also use the "behind" cart with a >> place-holder value of parallelism. But you have to be careful that you >> don't use this calculation to determine parallelism. >> >> I have just checked into >> https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new >> metadata provider: >> >> interface Size { >> Double averageRowSize(); >> List<Double> averageColumnSizes(); >> } >> >> Then I propose to add the following providers. (Recall that a metadata >> providers is a mix-in interface to RelNode; each method is evaluated >> for a particular RelNode.) >> >> interface Parallelism { >> /** Returns true if each physical operator implementing this >> relational expression >> * belongs to a different process than its inputs. */ >> boolean isStageLeaf(); >> >> /** Returns the number of distinct splits of the data. >> * >> * <p>For broadcast, where each copy is the same, returns 1. */ >> int splitCount(); >> } >> >> interface Memory { >> /** Returns the expected amount of memory, in bytes, required by a >> physical operator >> * implementing this relational expression, across all splits. >> * >> * <p>How much memory is used depends on the algorithm; for example, >> an implementation >> * Aggregate that builds a hash table requires approximately >> rowCount * averageRowSize bytes, >> * whereas an implementation that assumes that the input is sorted >> uses only averageRowSize. */ >> Double memory(); >> >> /** Returns the cumulative amount of memory, in bytes, required by >> the physical operator >> * implementing this relational expression, and all operators within >> the same stage, >> * across all splits. */ >> Double cumulativeMemoryWithinStage(); >> >> /** Returns the expected cumulative amount of memory, in bytes, >> required by the physical operator >> * implementing this relational expression, and all operators within >> the same stage, >> * within each split. >> * >> * <p>Basic formula: >> * cumulativeMemoryWithinStageSplit >> * = cumulativeMemoryWithinStage / Parallelism.splitCount */ >> Double cumulativeMemoryWithinStageSplit(); >> } >> >> If you have not yet determined the parallelism, use >> cumulativeMemoryWithinStage; if you have determined parallelism, use >> cumulativeMemoryWithinStageSplit. >> >> What do y'all think of my terminology: split, stage, stage-leaf, >> process, memory. (It's not going to be the same as every engine, but >> if it's at least clear.) >> >> Julian >> >> On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]> >> wrote: >> > Yes, absolutely, that's the only sane way to approach this. >> > >> > Calcite's metadata provider model does make it possible to use an >> estimate of parallelism at one stage of planning, then use the real number >> at a later stage. >> > >> > In my next email I propose some new metadata providers. Hopefully they >> could be fitted into Drill's planning process. Also advise whether the >> terminology (processes, splits, stages, memory, size) seems intuitive even >> though not tied to a particular execution engine. >> > >> > Julian >> > >> > >> >> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]> >> wrote: >> >> >> >> A little food for thought given our work on Drill... >> >> >> >> We punted on trying to optimize all of this at once. Very little of the >> >> core plan exploration process is influenced directly by degree of >> >> parallelism. We do manage between parallelized and not and use a few >> >> "hints" during planning to make some decisions where we need to. >> >> Afterwards, we do a secondary pass where we determine parallelism and >> >> memory consumption. Afterwards, we will do a replan with more >> conservative >> >> memory settings if our first plan turns out not to fit into available >> >> cluster memory. While we may lose some plans that are ideal, it makes >> the >> >> whole process substantially easier to reason about. >> >> >> >> We also did this because some of our initial experimentation where we >> >> included a number of these things as part of the planning process caused >> >> the planning time to get out of hand. >> >> >> >> >> >> >> >> >> >> >> >> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]> wrote: >> >> >> >>> I am currently helping the Hive team compute the memory usage of >> >>> operators (more precisely, a set of operators that live in the same >> >>> process) using a Calcite metadata provider. >> >>> >> >>> Part of this task is to create an “Average tuple size” metadata >> >>> provider based on a “Average column size” metadata provider. This is >> >>> fairly uncontroversial. (I’ll create a jira case when >> >>> https://issues.apache.org/ is back up, and we can discuss details such >> >>> as how to compute average size of columns computed using built-in >> >>> functions, user-defined functions, or aggregate functions.) >> >>> >> >>> Also we want to create a “CumulativeMemoryUseWithinProcess” metadata >> >>> provider. That would start at 0 for a table scan or exchange consumer, >> >>> but increase for each operator building a hash-table or using sort >> >>> buffers until we reach the edge of the process. >> >>> >> >>> But we realized that we also need to determine the degree of >> >>> parallelism, because we need to multiply AverageTupleSize not by >> >>> RowCount but by RowCountWithinPartition. (If the data set has 100M >> >>> rows, each 100 bytes and is bucketed 5 ways then each process will >> >>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.) >> >>> >> >>> Now, if we are already at the stage of planning where we have already >> >>> determined the degree of parallelism, then we could expose this as a >> >>> ParallelismDegree metadata provider. >> >>> >> >>> But maybe we’re getting the cart before the horse? Maybe we should >> >>> compute the degree of parallelism AFTER we have assigned operators to >> >>> processes. We actually want to choose the parallelism degree such that >> >>> we can fit all necessary data in memory. There might be several >> >>> operators, all building hash-tables or using sort buffers. The natural >> >>> break point (at least in Tez) is where the data needs to be >> >>> re-partitioned, i.e. an Exchange operator. >> >>> >> >>> So maybe we should instead compute “CumulativeMemoryUseWithinStage”. >> >>> (I'm coining the term “stage” to mean all operators within like >> >>> processes, summing over all buckets.) Let’s suppose that, in the above >> >>> data set, we have two operators, each of which needs to build a hash >> >>> table, and we have 2GB memory available to each process. >> >>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2 >> >>> hash-tables = 20GB. So, the parallelism degree should be 20GB / 2GB = >> >>> 10. >> >>> >> >>> 10 is a better choice of parallelism degree than 5. We lose a little >> >>> (more nodes used, and more overhead combining the 10 partitions into >> >>> 1) but gain a lot (we save the cost of sending the data over the >> >>> network). >> >>> >> >>> Thoughts on this? How do other projects determine the degree of >> >>> parallelism? >> >>> >> >>> Julian >> >>> >> > >>
