I've committed CALCITE-603 to master and I have pushed a new 1.1.0-incubating-SNAPSHOT to Apache Nexus.
Julian On Feb 25, 2015, at 1:05 AM, Jesus Camachorodriguez < [email protected]> wrote: +1 The providers and corresponding interfaces proposed in CALCITE-603 LGTM; I think they cover the needs that were arising when we were moving forward with cost-optimization/algorithm-selection in Hive's optimization phase. Thanks, Jesús On 2/24/15, 7:29 PM, "Julian Hyde" <[email protected]> wrote: I think your "phase" concept matches my "stage". I'll use "phase" too. Agree, we should not separate the sender and receiver of an Exchange into separate RelNodes. I didn't mean to give that impression. Maybe I should call it "isPhaseTransition" rather than "isStageLeaf". Hive/Tez does not have a concept of threads (as distinct from processes). But I think the "split" concept will serve both Hive and Drill. So maybe the question is, do you delineate these as two different concepts or combine them into one (memory usage stages and parallelization change phases (e.g. exchanges)). Really good question. However, I'm going to punt. I think there is more complexity over the horizon when we start modeling blocking operators, phased pipelines where phase 1 starts releasing memory as phase 2 starts allocating it. "isStageLeaf" allows us to model a collection of consecutive operators that function as a single unit for purposes of memory allocation, and that's a good start. (If you want to detect a change in distribution, look at the Distribution metadata.) On Tue, Feb 24, 2015 at 10:59 AM, Jacques Nadeau <[email protected]> wrote: some thoughts - We have generic (specific) terms we use to explain these concepts: phase (major fragment) & slice (minor fragment or thread). - It isn't clear to me why Parallelism needs to expose stageLeaf. We are obviously aware of this fact but I'm not sure why it should be in the framework as a specialized concept. Note that for planning we also don't separate out the sending and receiving side of an exchange because it is often useful to reason about both concepts at the same time. For example affinity mapping. To be clear, we mean phase (major fragment) as a unit between two exchanges (or leaf fragment and root fragments which are delineated by an exchange). Note that this is different from what we mean by stages which is a separate concept that describes memory transition states. For example, you might have hash join. The join will separate the build side versus the probe side as two separate stages. Other blocking or partially blocking operators may also separate stages and memory accounting needs to understand both stages and phases. So maybe the question is, do you delineate these as two different concepts or combine them into one (memory usage stages and parallelization change phases (e.g. exchanges)). On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <[email protected]> wrote: Jesus, That makes sense. We basically need two carts: one in front of the horse (before we've determined parallelism), and one behind (after we have determined parallelism). As I said to Jacques, you could also use the "behind" cart with a place-holder value of parallelism. But you have to be careful that you don't use this calculation to determine parallelism. I have just checked into https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new metadata provider: interface Size { Double averageRowSize(); List<Double> averageColumnSizes(); } Then I propose to add the following providers. (Recall that a metadata providers is a mix-in interface to RelNode; each method is evaluated for a particular RelNode.) interface Parallelism { /** Returns true if each physical operator implementing this relational expression * belongs to a different process than its inputs. */ boolean isStageLeaf(); /** Returns the number of distinct splits of the data. * * <p>For broadcast, where each copy is the same, returns 1. */ int splitCount(); } interface Memory { /** Returns the expected amount of memory, in bytes, required by a physical operator * implementing this relational expression, across all splits. * * <p>How much memory is used depends on the algorithm; for example, an implementation * Aggregate that builds a hash table requires approximately rowCount * averageRowSize bytes, * whereas an implementation that assumes that the input is sorted uses only averageRowSize. */ Double memory(); /** Returns the cumulative amount of memory, in bytes, required by the physical operator * implementing this relational expression, and all operators within the same stage, * across all splits. */ Double cumulativeMemoryWithinStage(); /** Returns the expected cumulative amount of memory, in bytes, required by the physical operator * implementing this relational expression, and all operators within the same stage, * within each split. * * <p>Basic formula: * cumulativeMemoryWithinStageSplit * = cumulativeMemoryWithinStage / Parallelism.splitCount */ Double cumulativeMemoryWithinStageSplit(); } If you have not yet determined the parallelism, use cumulativeMemoryWithinStage; if you have determined parallelism, use cumulativeMemoryWithinStageSplit. What do y'all think of my terminology: split, stage, stage-leaf, process, memory. (It's not going to be the same as every engine, but if it's at least clear.) Julian On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]> wrote: Yes, absolutely, that's the only sane way to approach this. Calcite's metadata provider model does make it possible to use an estimate of parallelism at one stage of planning, then use the real number at a later stage. In my next email I propose some new metadata providers. Hopefully they could be fitted into Drill's planning process. Also advise whether the terminology (processes, splits, stages, memory, size) seems intuitive even though not tied to a particular execution engine. Julian On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]> wrote: A little food for thought given our work on Drill... We punted on trying to optimize all of this at once. Very little of the core plan exploration process is influenced directly by degree of parallelism. We do manage between parallelized and not and use a few "hints" during planning to make some decisions where we need to. Afterwards, we do a secondary pass where we determine parallelism and memory consumption. Afterwards, we will do a replan with more conservative memory settings if our first plan turns out not to fit into available cluster memory. While we may lose some plans that are ideal, it makes the whole process substantially easier to reason about. We also did this because some of our initial experimentation where we included a number of these things as part of the planning process caused the planning time to get out of hand. On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]> wrote: I am currently helping the Hive team compute the memory usage of operators (more precisely, a set of operators that live in the same process) using a Calcite metadata provider. Part of this task is to create an ³Average tuple size² metadata provider based on a ³Average column size² metadata provider. This is fairly uncontroversial. (I¹ll create a jira case when https://issues.apache.org/ is back up, and we can discuss details such as how to compute average size of columns computed using built-in functions, user-defined functions, or aggregate functions.) Also we want to create a ³CumulativeMemoryUseWithinProcess² metadata provider. That would start at 0 for a table scan or exchange consumer, but increase for each operator building a hash-table or using sort buffers until we reach the edge of the process. But we realized that we also need to determine the degree of parallelism, because we need to multiply AverageTupleSize not by RowCount but by RowCountWithinPartition. (If the data set has 100M rows, each 100 bytes and is bucketed 5 ways then each process will need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.) Now, if we are already at the stage of planning where we have already determined the degree of parallelism, then we could expose this as a ParallelismDegree metadata provider. But maybe we¹re getting the cart before the horse? Maybe we should compute the degree of parallelism AFTER we have assigned operators to processes. We actually want to choose the parallelism degree such that we can fit all necessary data in memory. There might be several operators, all building hash-tables or using sort buffers. The natural break point (at least in Tez) is where the data needs to be re-partitioned, i.e. an Exchange operator. So maybe we should instead compute ³CumulativeMemoryUseWithinStage². (I'm coining the term ³stage² to mean all operators within like processes, summing over all buckets.) Let¹s suppose that, in the above data set, we have two operators, each of which needs to build a hash table, and we have 2GB memory available to each process. CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2 hash-tables = 20GB. So, the parallelism degree should be 20GB / 2GB = 10. 10 is a better choice of parallelism degree than 5. We lose a little (more nodes used, and more overhead combining the 10 partitions into 1) but gain a lot (we save the cost of sending the data over the network). Thoughts on this? How do other projects determine the degree of parallelism? Julian
