+1 The providers and corresponding interfaces proposed in CALCITE-603 LGTM; I think they cover the needs that were arising when we were moving forward with cost-optimization/algorithm-selection in Hive's optimization phase.
Thanks, Jesús On 2/24/15, 7:29 PM, "Julian Hyde" <[email protected]> wrote: >I think your "phase" concept matches my "stage". I'll use "phase" too. > >Agree, we should not separate the sender and receiver of an Exchange >into separate RelNodes. I didn't mean to give that impression. Maybe I >should call it "isPhaseTransition" rather than "isStageLeaf". > >Hive/Tez does not have a concept of threads (as distinct from >processes). But I think the "split" concept will serve both Hive and >Drill. > >> So maybe the question is, do you delineate these as two different >>concepts or combine them into one (memory usage stages and >>parallelization change phases (e.g. exchanges)). > >Really good question. However, I'm going to punt. I think there is >more complexity over the horizon when we start modeling blocking >operators, phased pipelines where phase 1 starts releasing memory as >phase 2 starts allocating it. > >"isStageLeaf" allows us to model a collection of consecutive operators >that function as a single unit for purposes of memory allocation, and >that's a good start. (If you want to detect a change in distribution, >look at the Distribution metadata.) > >On Tue, Feb 24, 2015 at 10:59 AM, Jacques Nadeau <[email protected]> >wrote: >> some thoughts >> >> - We have generic (specific) terms we use to explain these concepts: >>phase >> (major fragment) & slice (minor fragment or thread). >> - It isn't clear to me why Parallelism needs to expose stageLeaf. We >>are >> obviously aware of this fact but I'm not sure why it should be in the >> framework as a specialized concept. >> >> Note that for planning we also don't separate out the sending and >>receiving >> side of an exchange because it is often useful to reason about both >> concepts at the same time. For example affinity mapping. >> >> To be clear, we mean phase (major fragment) as a unit between two >>exchanges >> (or leaf fragment and root fragments which are delineated by an >>exchange). >> Note that this is different from what we mean by stages which is a >>separate >> concept that describes memory transition states. For example, you might >> have hash join. The join will separate the build side versus the probe >> side as two separate stages. Other blocking or partially blocking >> operators may also separate stages and memory accounting needs to >> understand both stages and phases. >> >> So maybe the question is, do you delineate these as two different >>concepts >> or combine them into one (memory usage stages and parallelization change >> phases (e.g. exchanges)). >> >> >> On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <[email protected]> wrote: >> >>> Jesus, >>> >>> That makes sense. We basically need two carts: one in front of the >>> horse (before we've determined parallelism), and one behind (after we >>> have determined parallelism). >>> >>> As I said to Jacques, you could also use the "behind" cart with a >>> place-holder value of parallelism. But you have to be careful that you >>> don't use this calculation to determine parallelism. >>> >>> I have just checked into >>> https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new >>> metadata provider: >>> >>> interface Size { >>> Double averageRowSize(); >>> List<Double> averageColumnSizes(); >>> } >>> >>> Then I propose to add the following providers. (Recall that a metadata >>> providers is a mix-in interface to RelNode; each method is evaluated >>> for a particular RelNode.) >>> >>> interface Parallelism { >>> /** Returns true if each physical operator implementing this >>> relational expression >>> * belongs to a different process than its inputs. */ >>> boolean isStageLeaf(); >>> >>> /** Returns the number of distinct splits of the data. >>> * >>> * <p>For broadcast, where each copy is the same, returns 1. */ >>> int splitCount(); >>> } >>> >>> interface Memory { >>> /** Returns the expected amount of memory, in bytes, required by a >>> physical operator >>> * implementing this relational expression, across all splits. >>> * >>> * <p>How much memory is used depends on the algorithm; for example, >>> an implementation >>> * Aggregate that builds a hash table requires approximately >>> rowCount * averageRowSize bytes, >>> * whereas an implementation that assumes that the input is sorted >>> uses only averageRowSize. */ >>> Double memory(); >>> >>> /** Returns the cumulative amount of memory, in bytes, required by >>> the physical operator >>> * implementing this relational expression, and all operators within >>> the same stage, >>> * across all splits. */ >>> Double cumulativeMemoryWithinStage(); >>> >>> /** Returns the expected cumulative amount of memory, in bytes, >>> required by the physical operator >>> * implementing this relational expression, and all operators within >>> the same stage, >>> * within each split. >>> * >>> * <p>Basic formula: >>> * cumulativeMemoryWithinStageSplit >>> * = cumulativeMemoryWithinStage / Parallelism.splitCount */ >>> Double cumulativeMemoryWithinStageSplit(); >>> } >>> >>> If you have not yet determined the parallelism, use >>> cumulativeMemoryWithinStage; if you have determined parallelism, use >>> cumulativeMemoryWithinStageSplit. >>> >>> What do y'all think of my terminology: split, stage, stage-leaf, >>> process, memory. (It's not going to be the same as every engine, but >>> if it's at least clear.) >>> >>> Julian >>> >>> On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]> >>> wrote: >>> > Yes, absolutely, that's the only sane way to approach this. >>> > >>> > Calcite's metadata provider model does make it possible to use an >>> estimate of parallelism at one stage of planning, then use the real >>>number >>> at a later stage. >>> > >>> > In my next email I propose some new metadata providers. Hopefully >>>they >>> could be fitted into Drill's planning process. Also advise whether the >>> terminology (processes, splits, stages, memory, size) seems intuitive >>>even >>> though not tied to a particular execution engine. >>> > >>> > Julian >>> > >>> > >>> >> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]> >>> wrote: >>> >> >>> >> A little food for thought given our work on Drill... >>> >> >>> >> We punted on trying to optimize all of this at once. Very little >>>of the >>> >> core plan exploration process is influenced directly by degree of >>> >> parallelism. We do manage between parallelized and not and use a >>>few >>> >> "hints" during planning to make some decisions where we need to. >>> >> Afterwards, we do a secondary pass where we determine parallelism >>>and >>> >> memory consumption. Afterwards, we will do a replan with more >>> conservative >>> >> memory settings if our first plan turns out not to fit into >>>available >>> >> cluster memory. While we may lose some plans that are ideal, it >>>makes >>> the >>> >> whole process substantially easier to reason about. >>> >> >>> >> We also did this because some of our initial experimentation where >>>we >>> >> included a number of these things as part of the planning process >>>caused >>> >> the planning time to get out of hand. >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]> >>>wrote: >>> >> >>> >>> I am currently helping the Hive team compute the memory usage of >>> >>> operators (more precisely, a set of operators that live in the same >>> >>> process) using a Calcite metadata provider. >>> >>> >>> >>> Part of this task is to create an ³Average tuple size² metadata >>> >>> provider based on a ³Average column size² metadata provider. This >>>is >>> >>> fairly uncontroversial. (I¹ll create a jira case when >>> >>> https://issues.apache.org/ is back up, and we can discuss details >>>such >>> >>> as how to compute average size of columns computed using built-in >>> >>> functions, user-defined functions, or aggregate functions.) >>> >>> >>> >>> Also we want to create a ³CumulativeMemoryUseWithinProcess² >>>metadata >>> >>> provider. That would start at 0 for a table scan or exchange >>>consumer, >>> >>> but increase for each operator building a hash-table or using sort >>> >>> buffers until we reach the edge of the process. >>> >>> >>> >>> But we realized that we also need to determine the degree of >>> >>> parallelism, because we need to multiply AverageTupleSize not by >>> >>> RowCount but by RowCountWithinPartition. (If the data set has 100M >>> >>> rows, each 100 bytes and is bucketed 5 ways then each process will >>> >>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.) >>> >>> >>> >>> Now, if we are already at the stage of planning where we have >>>already >>> >>> determined the degree of parallelism, then we could expose this as >>>a >>> >>> ParallelismDegree metadata provider. >>> >>> >>> >>> But maybe we¹re getting the cart before the horse? Maybe we should >>> >>> compute the degree of parallelism AFTER we have assigned operators >>>to >>> >>> processes. We actually want to choose the parallelism degree such >>>that >>> >>> we can fit all necessary data in memory. There might be several >>> >>> operators, all building hash-tables or using sort buffers. The >>>natural >>> >>> break point (at least in Tez) is where the data needs to be >>> >>> re-partitioned, i.e. an Exchange operator. >>> >>> >>> >>> So maybe we should instead compute >>>³CumulativeMemoryUseWithinStage². >>> >>> (I'm coining the term ³stage² to mean all operators within like >>> >>> processes, summing over all buckets.) Let¹s suppose that, in the >>>above >>> >>> data set, we have two operators, each of which needs to build a >>>hash >>> >>> table, and we have 2GB memory available to each process. >>> >>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2 >>> >>> hash-tables = 20GB. So, the parallelism degree should be 20GB / >>>2GB = >>> >>> 10. >>> >>> >>> >>> 10 is a better choice of parallelism degree than 5. We lose a >>>little >>> >>> (more nodes used, and more overhead combining the 10 partitions >>>into >>> >>> 1) but gain a lot (we save the cost of sending the data over the >>> >>> network). >>> >>> >>> >>> Thoughts on this? How do other projects determine the degree of >>> >>> parallelism? >>> >>> >>> >>> Julian >>> >>> >>> > >>>
