[ https://issues.apache.org/jira/browse/IMPALA-8018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Paul Rogers updated IMPALA-8018: -------------------------------- Description: The math is wrong when we estimate the cardinality for both M:1 (FK/PK joins) (IMPALA-8014) and for M:N (generic) joins (IMPALA-8015). General practice is that the two cases are treated the same as described here. TL;DR: skip to the last section for the proposed change. See IMPALA-8014 and IMPALA-8015 for details about the bugs in the current code. h4. Background Join logic is complex. To ensure that the analysis is sound we work it out from first principles, then verify against the Swami & Schiefer, [On the Estimation of Join Result Sizes|https://pdfs.semanticscholar.org/2735/672262940a23cfce8d4cc1e25c0191de7da7.pdf] (S&S). Note especially section 3, Background. h4. Definitions The following terms are used below: * _Relation:_ either a _base table_ or the result of a join. (Relational theory says that both are simply relations.) We use an upper case letter, typically R, for a relation. E.g.: {{R1}}, {{R1}} * _Key:_ A column used in a join. We use a lower case letter for a column, and typically use “k” for a key, along with the relation number. E.g. {{k1}} and {{ON R1.k1a = R2.k2a}}. * _Compound key:_ A join key made up of multiple columns. We use a letter to denote each column. E.g. {{k1a}} and {{ON R1.k1a = R2.k2a AND R1.k1b = R2.k2b}} * _Join:_ the join of two relations. Impala supports the join of two base tables (a table-to-table join), or the join of a base table and another join (a table-to-join join.) We use the symbol {{⋈}} to denote a join. Impala typically uses a hash join. * _Relation cardinality:_ the number of rows in the relation, denoted {{|R|}}. * _Join cardinality:_ the cardinality of the relation produced by the join. That is, the join’s output cardinality. Denoted as {{|R1 ⋈ R2|}}. * _Key column cardinality:_ the number of unique values in a column, denoted {{|k|}}. Also known as the NDV of the column or {{ndv(k)}}. In Impala, HMS provides the relation and join key cardinality as table and column statistics respectively. Since Impala typically uses hash joins, it is helpful to use terminology specific to that case: * _Probe_ side of a join: the larger side (ideally) of a hash join. Also called the “left” side. Is known as {{chiild(0)}} in the code. Appears at the same level as the join in the query plan (which would be to the left if the plan were rotated 90 degrees counter-clockwise.) * _Build_ side of a join: the smaller side (ideally) of a hash join. Also called the “right” side. Is known as {{child(1)}} in the code. Appears indented directly under the join in the query plan. In Impala: * The detail table in a M:1 relationship is always on the probe (left) side of a join. Represented as {{child(0)}} in the code. * The master table in a M:1 relationship is always on the build (right) side of a join. Represented as {{child(1)}} in the code. Finally, we also need: * The _scan_ of a table: the (possibly smaller) relation produced by applying one or more predicates while scanning a table. We are concerned with the cardinality of the scan, denoted as {{|R1'|}}. We assume that Impala has already used rules (not discussed here) to estimate the cardinality of the selection. h4. Deriving the Join Formula For the M:1 Case In RDBMS, a primary key (PK) is a column (or, more typically, set of columns) that uniquely identify a row in the master table (M). Primary keys are generally indexed via a unique index. Since keys are unique: {noformat} |M.pk| = |M| {noformat} The Detail table forms a M:1 relationship with the master table. Each foreign key (FK) in the detail table (D) references one primary key in the master table. Because of the M:1 relationship: {noformat} |D.fk| <= |M| << |D| {noformat} If we read all rows from both tables, and all primary keys appear as foreign keys, the the join cardinality is simply: {noformat} |D ⋈ M| = |D| [Equation 1] {noformat} h5. Filtering Master Rows Let’s consider what happens when we filter out master (M) table rows producing a subset M'. To do so, we make three assumptions: * A uniform distribution of foreign keys, (the “Uniformity” assumption in the S&S paper cited above), * Every primary key is referenced by an equal number of foreign keys (implied by the uniformity assumption), * The filter results in a random sampling of master rows not correlated with the join keys. A later section will relax the third assumption. Then the probability of any particular primary key appearing is: {noformat} p(pk) = |M'| / |M| {noformat} The result value is a probability (hence the {{p()}} function) given by the ratio of selected rows to total table rows (from basic probability theory.) The value runs from 0 (no master rows match the scan predicates) to 1 (all rows present). The revised join cardinality is: {noformat} |D ⋈ M'| = |D| * p(pk) = |D| * |M'| / |M| [Equation 2] {noformat} h5. Filtering Detail Rows Suppose we instead filter detail rows to produce a new subset D'. Again we make some assumptions: * Uniform distribution of foreign key values across primary keys. * The "Containment" assumption from the S&S paper that the set of foreign keys is a (possibly full) subset of the set of primary keys. * The filter results in a random sampling of detail rows not correlated with the join keys. Again, a later section will relax the third assumption. With these assumptions, we can see that if we join D' with the master table M, every row that remains in D' will still find a match in M, so: {noformat} |D' ⋈ M| = |D'| {noformat} h5. Filtering Both Master and Detail Rows Continuing with the assumption that filters are not correlated with keys, we can combine the two selection models to produce: {noformat} |D' ⋈ M'| = |D'| * |M'| / |M| [Equation 3] {noformat} h5. Non-Containment The above is based on the S&S Containment assumption: that the set of foreign keys is a subset of the set of primary keys. In a normal RDBMS with integrity constraints, this is a valid assumption. But, in Big Data, things are messy and we can’t actually make this assumption. Fortunately, there is a way loosen the containment assumption. Suppose the master table has half the number of keys as the detail table. Using the uniformity assumption, half the foreign keys will go unmatched. If there are four times as many foreign keys as primary keys, only a quarter of the detail rows will find matches. The probability of any one foreign key finding a match is: {noformat} p(match) = |M.pk| / |D.fk| {noformat} The above applies only if {{|D.fk| > |M.pk|}}; no adjustment is needed if {{|D.fk| < |M.pk|}}. We can express this mathematically as: {noformat} p(match) = |M.pk| / max(|D.fk|, |M.pk|) [Equation 4] {noformat} Combining this with Equation 3, recalling that {{|M.pk| = |M|}}: {noformat} |D' ⋈ M'| = |D'| * (|M'| / |M|) * p(match) = |D'| * |M'| / |M| * |M.pk| / max(|D.fk|, |M.pk|) = |D'| * |M'| * (|M| / |M|) / max(|D.fk|, |M.pk|) = |D'| * |M'| / max(|D.fk|, |M|) {noformat} If we rearrange the above we get: {noformat} |D'| * |M'| |D' ⋈ M'| = ---------------- [Equation 5] max(|D.fk|, |M|) {noformat} h4. The S&S Equation Let’s check out answer against the S&S paper cited above by assuming the master/detail relationship and removing from our equation the affect of scans. The paper’s equation: {noformat} |R1 ⋈ R2| = min( |k1|, |k2| ) * ( |R1| / |k1| ) * ( |R2| * |k2| ) = |R1| * |R2| / max( |k1|, |k2| ) {noformat} If we assume: * {{R1 = D}} * {{R2 = M}} * {{k1 = D.fk}} * {{k2 = M.pk}} * {{|D.fk| > |M.pk|}} We get: {noformat} |D ⋈ M| = |D| * |M| / max( |D.fk|, |M.pk| ) {noformat} Which is the same as Equation 5 (assuming no selection). All good. h4. Compound Keys The discussion thus far has assumed a single key column per table so that we have NDV values for each. The next challenge occurs if the query has compound columns: {noformat} SELECT … FROM T1, T2 WHERE T1.k1a = T2.k2a AND T1.k1b = T2.k2b {noformat} We will use the symbol {{j}} to mean a joint (compound) key. The joint key for table {{T1}} is {{T1.j1}} and {{||T1.j1|}} is the cardinality (NDV) of that key. In an RDBMS, if an index exists on (k1a, k1b) and (k2a, k2b) then it is possible to determine the joint NDV (which is just the number of index entries.) In HMS, we have the individual NDVs, but not the joint NDV. The S&S paper supplies the require assumption: “Independence: Within each relation, values chosen from distinct columns that are involved in join predicates are independent.” This lets us estimate the joint NDV: One way to think of this is the the columns form a matrix, say (year, month). Every year has 12 months, so there is no correlation between the two. If we have data for 10 years, we will have 10 * 12 = 120 distinct key values. {noformat} |T.j| = |T.k1| * |T.k2| * … * |T.kn| = ∏ |T.ki| {noformat} Where {{∏}} is the product operator. (S&S, section 3.3, calls this the "Multiplicative Rule.") Of course, keys are not always independent. Suppose the keys are (state, city). In this case, there are 50 states. But most city names occur in only one state, so NDV(state) * NDV(city) >> NDV(state, city). Still, we know that, regardless of the correlation, we can never have more keys than rows. We can adjust the expression to: {noformat} |T.j| = min( ∏ |T.ki|, |T| ) {noformat} We can also assume that joins are typically formulated to be as selective as possible. So the assumption may not be too far off. Still, if a file contains addresses, so the key is (state, city, street), and we just the (state, city) prefix, the joint NDV will smaller than the table cardinality. So we need both terms above. Adding this to the expression derived above: {noformat} |D'| * |M'| |D' ⋈ M'| = --------------------------------------------- [Equation 6] max(min(∏ |D.fki|, |D|), min(∏ |M.pki|, |M|)) {noformat} h4. Correlated Filters Suppose that we relax the assumption that filters are not correlated with join keys and assume they are. For example: {noformat} SELECT ... FROM master m, detail d WHERE m.pk = d.fk AND m.pk = 10 {noformat} The simplified math used above no longer applies: the filter affects the number of join keys available. Applying Equation 5 in this case results in a "double accounting" for the primary key filter. Equation 5 has four terms. Consider a predicate on the master table: * The predicate reduces the number of table rows from {{|M|}} to {{|M'|}}. * The predicate also reduces the number of keys from {{|M.pk| (= |M|)}} to {{|M.pk'| (= |M'|)}}. The above equation does not consider the reduction in key cardinality. Let's add that now: {noformat} |D'| * |M'| |D' ⋈ M'| = --------------------- [Equation 7] max(|D.fk'|, |M.pk'|) {noformat} For the specific example above: {noformat} |M.pk'| = |M.pk| * sel(M.pk = 10) = |M.pk| * 1/|M.pk| = 1 |M'| = |M| * sel(M.pk = 10) = |M| * 1/|M.pk| = 1 |D ⋈ M'| = |D| * |M'| / max(|D.fk|, |M.pk'|) = |D| * 1 / max(|D.fk|, 1) = |D| / |D.fk| = |D| / |M| {noformat} Which is correct: we'll return a single master record and its details which is the average number of details per master. The same logic works if we filter on the detail side: {noformat} SELECT ... FROM master m, detail d WHERE m.pk = d.fk AND d.fk = 10 {noformat} So: {noformat} |D.fk'| = |D.fk| * sel(D.fk = 10| = |D.fk| * 1/|D.fk| = 1 |D'| = |D| * sel(D.fk = 10| = |D| / |D.fk| |D' ⋈ M| = |D'| * |M| / max(|D.fk'|, |M.pk|) = |D| / |D.fk| / * |M| / |M.pk| = |D| / |M| {noformat} It also works if we filter on both sides (which Impala does implicitly): {noformat} SELECT ... FROM master m, detail d WHERE m.pk = d.fk AND m.pk = 10 AND d.fk = 10 {noformat} Some of the steps are the same as earlier, let's focus on the join: {noformat} |D' ⋈ M'| = |D'| * |M'| / max(|D.fk'|, |M.pk'|) = (|D| / |D.fk|) * 1 / max(1, 1) = |D| / |M| {noformat} Now, suppose we place a broad filter on the master table: {noformat} SELECT ... FROM master m, detail d WHERE m.pk = d.fk AND m.balance > 1234 {noformat} Let's apply the same math, assuming {{sel(m.pk > 1234) = 1/3}}. {noformat} |M'| = |M| * sel(m.balance > 1234) = |M| / 3 |M.pk'| = |M.pk| * sel(m.balance > 1234) = |M.pk| / 3 = |M'| |D ⋈ M'| = |D| * |M'| / max(|D.fk|, |M.pk'|) = |D| * (|M| / 3) / max(|D.fk|, |M.pk| / 3) = (|D| / 3) * |M| / |D.fk| = |D| / 3 {noformat} Remembering, again, that {{|D.fk| = |M.pk| = |M|}}. h4. Compound Keys with Correlated Filters If the join key is compound (two or more keys), then we apply the selectivity to either each key as we multiply them, or the the final product: {noformat} |T.k'| = ∏ |T.ki| * sel(T) = sel(T) * ∏ |T.ki| {noformat} That is, we assume that the filter reduces the cardinality of all keys equally. This may require more thought to verify. The key cardinality is available in metadata. The predicate selectivity is not currently available. Since the scan computes it to compute the scan cardinality, the scan node can simply save the selectivity for use by the join. Plugging this into Equation 6: {noformat} |L'| * |R'| |L' ⋈ R'| = ---------------------------------------- [Equation 8] max(sel(L) * ∏ |L.ki|, sel(R) * ∏ |R.ki|) {noformat} With that, it is a simple matter to compute the join cardinality. h4. Generalizing the Correlated Filter Case A complication then occurs. The next join will have the present join as one of its input relations. What selectivity should this join report? We need the following: * Table cardinalities: {{|L|}} and {{|R|}}, which can be base tables or joins. * The local predicates for each table, {{sel(L)}} and {{sel(|R|}} along with their selectivities. * The NDV values for the join predicates {{|L.k1|}} and {{|R.k2|}}. >From these we can work out the math. As it turns out, the scan nodes already >compute the scan cardinalities for us: {{|L'|}} and {{|R'|}}. The join must >work out the other terms. The key cardinalities are worked out by applying the scan predicate selectivity to the join key NDVs: {noformat} |T.k'| = |T.k| * sel(T) |D'| * |M'| |D' ⋈ M'| = --------------------- max(|D.fk'|, |M.pk'|) |D| * |M| sel(join) = --------- |D' ⋈ M'| |D'| = |D| * sel(D) |M'| = |M| * sel(M) {noformat} The top term then becomes: {noformat} |D| * |M| = (|D'| / sel(D)) * |M'| / sel(M)) {noformat} So the whole equation is: {noformat} sel(join) = (|D'| / sel(D)) * |M'| / sel(M)) / |D' ⋈ M'| {noformat} Or {noformat} |D'| * |M'| sel(join) = --------------------------- [Equation 9] |D' ⋈ M'| * sel(D) * sel(M) {noformat} All the above terms are available in the planner. The selectivity computed above then becomes the input to the next join. h4. M:N (Generic) Case Join discussions are more intuitive when discussed in the M:1 (FK/PK) case. But the math works just as well for the M:N (generic) case. Assume a join of two tables, left ({{L}}) and right ({{R}}), with no predicate. We have a Cartesian product: {noformat} |L ⋈ R| = |L| * |R| {noformat} Suppose we have an equi-join predicate: {{L.k1 = R.k2}}. We can now use a hash join in which L is on the probe side and R is on the build side. Because we are concerned with the M:N (generic) case, we assume that {{|R.k2| < |R|}}. This means that multiple build rows have the same key, say {{R.k2 = x}}. This then implies that each row of the L (probe) side will potentially match multiple rows on the R (build) side. We want to know, how many rows will each probe-side row match? Let’s focus on a table-to-table join and assume we can obtain the following from HMS: * Probe and build table cardinalities: {{|L|}} and {{|R|}} * Key cardinalities (NDVs): {{|L.k1|}} and {{|R.k2|}} The probe side will match rows on the build side where {{R 𝜎 R.k2 = L.k1}}. Using the uniformity assumption from the S&S paper (see IMPALA-8014) we can see that the the values of R.k1 divide the R table into a set of {{|R.k2|}} groups, the size of each must be: {noformat} |R'| = |R| / |R.k2| {noformat} Let’s assume that every row on the L side matches some row on the R side. Then, the join cardinality is just: {noformat} |L ⋈ R| = |L| * |R| / |R.k2| {noformat} Both the L and R tables may be subject to selection during scan (see IMPALA-8014) that is an unbiased sampling (given the uniformity assumption) of the rows of both tables. This reduces the rows available to join, but does not reduce the population of groups from which the sample is drawn. So: {noformat} |L' ⋈ R'| = |L'| * |R'| / |R.k2| {noformat} Intuitively, however may rows are scanned, they are still divided into the same set of groups. The above assumes that all rows from L match rows in R. (This is called the “containment assumption” in the S&S paper.) But, Big Data is messy. Perhaps there are more key values in L than R or visa-versa. We can make some reasonable assumptions: * If there are fewer values in L.k1 than in R.k2, we can assume all probe rows will match a build key. * If there are more values in L.k1 than in R.k2, we can assume we'll match all keys on the build side, then discard the extra probe values that don't match. Again using the uniformity assumption, the probability is simply the ratio of the the number of keys available for matching (the right or probe side) divided by the number of keys we want to match (the left or probe side): {noformat} p(match) = / |R.k2| / |L.k1| if |L.k1| > |R.k2|, \ 1 otherwise = |R.k2| / max( |L.k1|, |R.k2| ) {noformat} Let's check. * If the ndv's are equal, the probability of a match is 1. * If either table is empty, the probability is 0. * If probe keys ({{|L.k1|}}) is half that of build keys ({{|R.k2|}}) then all probe rows will find a mach, so the probability is 1 (though half of the build side rows will go unmatched.) * If we have twice as many probe keys {{|L.k1|}} as build keys ({{|R.k2|}}) then half probe rows won’t find a match and the probability of a match is 0.5. All good. Putting it all together: {noformat} |L’ ⋈ R’| = |L’| * |R’| / |R.k2| * p(match) = (|L’| * |R’| / |R.k2|) * |R.k2| / max(|R.k2|, |L.k1|) = |L’| * |R’| / max(|R.k2|, |L.k1|) {noformat} Rearranging terms, we get the M:N cardinality estimation expression: {noformat} |L’| * |R’| |L’ ⋈ R’| = ------------------- max(|L.k1|, |R.k2|) {noformat} As it turns out, this is exactly Equation 2 in the S&S paper which provides confirmation that the derivation is correct. It is also the same (except for names) as Equation 5 above, showing that the M:1 and M:N cases are the same mathematically. Said another way, as the M:N groups on the "N" side get ever smaller, they will converge on a group size of 1, which is the M:1 case. So, the M:1 (FK/PK) formula *must* be a special case of the more generic M:N (generic) formula. h4. Proposed Fix All of this leads to the proposed fix. To gather the details from the above discussion, modify the planner as follows: * Add node selectivity as state on each node that can be obtained by the join node. * For the Scan node, retain the selectivity already computed. * For the Join node, compute selectivity as shown below. * Modify the Join node to compute join cardinality using the equations below. We assume the following are available from the input nodes: * {{|T'|}}, the node output cardinality * {{sel(T)}}, the selectivity of that node {noformat} |L.k'| = min(sel(L) * ∏ |L.ki'|, |L'|) |R.k'| = min(sel(R) * ∏ |R.ki'|, |R'|) |L'| * |R'| |L' ⋈ R'| = ------------------- max(|L.k'|, |R.k'|) |L'| * |R'| sel(join) = --------------------------- |L' ⋈ R'| * sel(L) * sel(R) {noformat} was: IMPALA-8014 discussed the math for FK/PK (M:1) cardinality estimation (and a bug in that logic.) IMPALA-8015 similarly discussed the math for generic (M:N) cardinality estimation (and a bug.) The math in those two tickets suggests that the code is a bit overly complex: that there are not, in fact, two cases ("PK/FK" and "generic") but just one, at least for cardinality estimation. Consider the generic formula worked out in IMPALA-8015: {noformat} p(match) = ndv(r.b) / ndv(l.a) if ndv(r.b) < ndv(l.a), ndv(l.a) / ndv(r.b) otherwise |l >< r| = |scan(l)| * |scan(r)| / ndv(r.b) * p(match) {noformat} Intuitively, each row from the left (probe) side matches a group of rows on the right hand (build ) side. How big is that group? It is the total of right-hand rows divided by the ndv of the right hand key (assuming uniform key distribution). That is: {noformat} |r 𝜎 r.b=x| = |r| / ndv(r.b) {noformat} A query typically applies filters to the right table. Assuming a uniform reduction via a non-key filter: {noformat} |scan(r 𝜎 r.b=x)| = |scan(r)| / ndv(r.b) {noformat} The formula is the "the cardinality of a selection of table r where r.b equals some key value x". That's a mouthful, so let's abbreviate: {noformat} |r.b| = |r| / ndv(r.b) |scan(r.b)| = |scan(r)| / ndv(r.b) {noformat} One thing to notice is that the same NDV value appears in both cases. Applying a filter should reduce the NDV. But, we are concerned about the M:N case, so the right-hand filter simply reduces the size of each group, but not the number of groups. Of course, if filtering is severe enough, it will eliminate entire groups. This shows up because the ratio `|scan(r)| / ndv(r.b)` will become fractional and represent the probability that any member of the group is available for join. This means that the single formula works for all cases: full table, partial table and very heavy selection. We can now rewrite the generic-case equation from IMPALA-8015: {noformat} |l >< r| = |scan(l)| * |scan(r)| / ndv(r.b) * p(match) = |scan(l)| * |scan(r.b)| * p(match) {noformat} The probability term handles the case that there are more keys on one side than the other so that some rows won't find a match. We assume that if the same number of keys exist on both sides, that they are the same keys. (If not, the user would likely not have bothered to create a join predicate that equates them.) What happens as the number of right-hand keys increases? The group size decreases. The largest that the {{ndv(r.b)}} can get is {{|r|}}, so in the limit the group size goes to 1. Let's assume we know from metadata that {{ndv(r.b) = |r|}}, then we have a group size of 1 and we can ignore the group size term, so: {noformat} |l >< r| = |scan(l)| * |scan(r.b)| * p(match) |r.b| = 1 |l >< r| = |scan(l)| * |scan(r.b)| * p(match) = |scan(l)| * 1 * p(match) = |scan(l)| * p(match) {noformat} Now, let's recall the formula used in the 1:M ("PK/FK") case: {noformat} |d >< m| = |d| * p(match) {noformat} They are identical. But, what about that {{p(match)}} term, perhaps that is different in the two cases. In the one case, it tells us a M:N probability, in the other it tells us the M:1 probability. Let's check: {noformat} p(match) = ndv(d.fk) / ndv(m.pk) if ndv(d.fk) < ndv(m.pk), ndv(m.pk) / ndv(d.fk) otherwise. p(match) = ndv(r.b) / ndv(l.a) if ndv(r.b) < ndv(l.a), ndv(l.a) / ndv(r.b) otherwise {noformat} Except for a difference in names, the formulas are identical. This means that the M:1 case is automatically handled by the M:N case, with 1 simply a degenerate value for N. This, in turn, means that Impala need not implement two cardinality estimation paths; a single path will suffice. Let's double check. How does the code pick the M:1 path? By considering if {{ndv(r.b) = |r|}}. If so, then {{r}} is actually {{m}} (a master table and {{l}} is {{d}} (a detail table) and we have the FK/PK case. On the other hand, if {{ndv(r.b) < |r|}}, we have a M:N case, each row in {{l}} matches multiple rows in {{r}} called the generic case in Impala. But, what if {{ndv(r.b)}} is less then {{|r|}} by a small amount, say 10%? That means each row in {{l}} will match mostly one row, but about 10% of the time it will match two. So, the generic formula for this case should be close to the FK/PK formula. What if the difference is 5% or 1%? The generic formula should match even closer to the FK/PK formula. Finally when the difference is 0%, the generic formula must converge with the FK/PK formula. Said another way, as the M:N groups on the "N" side get ever smaller, they will converge on a group size of 1, which is the M:1 case. Given all this, the M:1 (FK/PK) formula *must* be a special case of the more generic M:N (generic) formula. The consequence is that the code should implement the math once, not twice. > Unify cardinality math for PK/FK and generic cases > -------------------------------------------------- > > Key: IMPALA-8018 > URL: https://issues.apache.org/jira/browse/IMPALA-8018 > Project: IMPALA > Issue Type: Improvement > Components: Frontend > Affects Versions: Impala 3.1.0 > Reporter: Paul Rogers > Priority: Minor > > The math is wrong when we estimate the cardinality for both M:1 (FK/PK joins) > (IMPALA-8014) and for M:N (generic) joins (IMPALA-8015). General practice is > that the two cases are treated the same as described here. > TL;DR: skip to the last section for the proposed change. See IMPALA-8014 and > IMPALA-8015 for details about the bugs in the current code. > h4. Background > Join logic is complex. To ensure that the analysis is sound we work it out > from first principles, then verify against the Swami & Schiefer, [On the > Estimation of Join Result > Sizes|https://pdfs.semanticscholar.org/2735/672262940a23cfce8d4cc1e25c0191de7da7.pdf] > (S&S). Note especially section 3, Background. > h4. Definitions > The following terms are used below: > * _Relation:_ either a _base table_ or the result of a join. (Relational > theory says that both are simply relations.) We use an upper case letter, > typically R, for a relation. E.g.: {{R1}}, {{R1}} > * _Key:_ A column used in a join. We use a lower case letter for a column, > and typically use “k” for a key, along with the relation number. E.g. {{k1}} > and {{ON R1.k1a = R2.k2a}}. > * _Compound key:_ A join key made up of multiple columns. We use a letter to > denote each column. E.g. {{k1a}} and {{ON R1.k1a = R2.k2a AND R1.k1b = > R2.k2b}} > * _Join:_ the join of two relations. Impala supports the join of two base > tables (a table-to-table join), or the join of a base table and another join > (a table-to-join join.) We use the symbol {{⋈}} to denote a join. Impala > typically uses a hash join. > * _Relation cardinality:_ the number of rows in the relation, denoted > {{|R|}}. > * _Join cardinality:_ the cardinality of the relation produced by the join. > That is, the join’s output cardinality. Denoted as {{|R1 ⋈ R2|}}. > * _Key column cardinality:_ the number of unique values in a column, denoted > {{|k|}}. Also known as the NDV of the column or {{ndv(k)}}. > In Impala, HMS provides the relation and join key cardinality as table and > column statistics respectively. > Since Impala typically uses hash joins, it is helpful to use terminology > specific to that case: > * _Probe_ side of a join: the larger side (ideally) of a hash join. Also > called the “left” side. Is known as {{chiild(0)}} in the code. Appears at the > same level as the join in the query plan (which would be to the left if the > plan were rotated 90 degrees counter-clockwise.) > * _Build_ side of a join: the smaller side (ideally) of a hash join. Also > called the “right” side. Is known as {{child(1)}} in the code. Appears > indented directly under the join in the query plan. > In Impala: > * The detail table in a M:1 relationship is always on the probe (left) side > of a join. Represented as {{child(0)}} in the code. > * The master table in a M:1 relationship is always on the build (right) side > of a join. Represented as {{child(1)}} in the code. > Finally, we also need: > * The _scan_ of a table: the (possibly smaller) relation produced by > applying one or more predicates while scanning a table. We are concerned with > the cardinality of the scan, denoted as {{|R1'|}}. We assume that Impala has > already used rules (not discussed here) to estimate the cardinality of the > selection. > h4. Deriving the Join Formula For the M:1 Case > In RDBMS, a primary key (PK) is a column (or, more typically, set of columns) > that uniquely identify a row in the master table (M). Primary keys are > generally indexed via a unique index. Since keys are unique: > {noformat} > |M.pk| = |M| > {noformat} > The Detail table forms a M:1 relationship with the master table. Each foreign > key (FK) in the detail table (D) references one primary key in the master > table. Because of the M:1 relationship: > {noformat} > |D.fk| <= |M| << |D| > {noformat} > If we read all rows from both tables, and all primary keys appear as foreign > keys, the the join cardinality is simply: > {noformat} > |D ⋈ M| = |D| [Equation 1] > {noformat} > h5. Filtering Master Rows > Let’s consider what happens when we filter out master (M) table rows > producing a subset M'. To do so, we make three assumptions: > * A uniform distribution of foreign keys, (the “Uniformity” assumption in the > S&S paper cited above), > * Every primary key is referenced by an equal number of foreign keys (implied > by the uniformity assumption), > * The filter results in a random sampling of master rows not correlated with > the join keys. > A later section will relax the third assumption. > Then the probability of any particular primary key appearing is: > {noformat} > p(pk) = |M'| / |M| > {noformat} > The result value is a probability (hence the {{p()}} function) given by the > ratio of selected rows to total table rows (from basic probability theory.) > The value runs from 0 (no master rows match the scan predicates) to 1 (all > rows present). > The revised join cardinality is: > {noformat} > |D ⋈ M'| = |D| * p(pk) > = |D| * |M'| / |M| [Equation 2] > {noformat} > h5. Filtering Detail Rows > Suppose we instead filter detail rows to produce a new subset D'. Again we > make some assumptions: > * Uniform distribution of foreign key values across primary keys. > * The "Containment" assumption from the S&S paper that the set of foreign > keys is a (possibly full) subset of the set of primary keys. > * The filter results in a random sampling of detail rows not correlated with > the join keys. > Again, a later section will relax the third assumption. > With these assumptions, we can see that if we join D' with the master table > M, every row that remains in D' will still find a match in M, so: > {noformat} > |D' ⋈ M| = |D'| > {noformat} > h5. Filtering Both Master and Detail Rows > Continuing with the assumption that filters are not correlated with keys, we > can combine the two selection models to produce: > {noformat} > |D' ⋈ M'| = |D'| * |M'| / |M| [Equation 3] > {noformat} > h5. Non-Containment > The above is based on the S&S Containment assumption: that the set of foreign > keys is a subset of the set of primary keys. In a normal RDBMS with integrity > constraints, this is a valid assumption. But, in Big Data, things are messy > and we can’t actually make this assumption. Fortunately, there is a way > loosen the containment assumption. > Suppose the master table has half the number of keys as the detail table. > Using the uniformity assumption, half the foreign keys will go unmatched. If > there are four times as many foreign keys as primary keys, only a quarter of > the detail rows will find matches. The probability of any one foreign key > finding a match is: > {noformat} > p(match) = |M.pk| / |D.fk| > {noformat} > The above applies only if {{|D.fk| > |M.pk|}}; no adjustment is needed if > {{|D.fk| < |M.pk|}}. We can express this mathematically as: > {noformat} > p(match) = |M.pk| / max(|D.fk|, |M.pk|) [Equation 4] > {noformat} > Combining this with Equation 3, recalling that {{|M.pk| = |M|}}: > {noformat} > |D' ⋈ M'| = |D'| * (|M'| / |M|) * p(match) > = |D'| * |M'| / |M| * |M.pk| / max(|D.fk|, |M.pk|) > = |D'| * |M'| * (|M| / |M|) / max(|D.fk|, |M.pk|) > = |D'| * |M'| / max(|D.fk|, |M|) > {noformat} > If we rearrange the above we get: > {noformat} > |D'| * |M'| > |D' ⋈ M'| = ---------------- [Equation 5] > max(|D.fk|, |M|) > {noformat} > h4. The S&S Equation > Let’s check out answer against the S&S paper cited above by assuming the > master/detail relationship and removing from our equation the affect of > scans. The paper’s equation: > {noformat} > |R1 ⋈ R2| = min( |k1|, |k2| ) * ( |R1| / |k1| ) * ( |R2| * |k2| ) > = |R1| * |R2| / max( |k1|, |k2| ) > {noformat} > If we assume: > * {{R1 = D}} > * {{R2 = M}} > * {{k1 = D.fk}} > * {{k2 = M.pk}} > * {{|D.fk| > |M.pk|}} > We get: > {noformat} > |D ⋈ M| = |D| * |M| / max( |D.fk|, |M.pk| ) > {noformat} > Which is the same as Equation 5 (assuming no selection). All good. > h4. Compound Keys > The discussion thus far has assumed a single key column per table so that we > have NDV values for each. The next challenge occurs if the query has compound > columns: > {noformat} > SELECT … > FROM T1, T2 > WHERE T1.k1a = T2.k2a AND T1.k1b = T2.k2b > {noformat} > We will use the symbol {{j}} to mean a joint (compound) key. The joint key > for table {{T1}} is {{T1.j1}} and {{||T1.j1|}} is the cardinality (NDV) of > that key. > In an RDBMS, if an index exists on (k1a, k1b) and (k2a, k2b) then it is > possible to determine the joint NDV (which is just the number of index > entries.) In HMS, we have the individual NDVs, but not the joint NDV. > The S&S paper supplies the require assumption: “Independence: Within each > relation, values chosen from distinct columns that are involved in join > predicates are independent.” This lets us estimate the joint NDV: > One way to think of this is the the columns form a matrix, say (year, month). > Every year has 12 months, so there is no correlation between the two. If we > have data for 10 years, we will have 10 * 12 = 120 distinct key values. > {noformat} > |T.j| = |T.k1| * |T.k2| * … * |T.kn| > = ∏ |T.ki| > {noformat} > Where {{∏}} is the product operator. (S&S, section 3.3, calls this the > "Multiplicative Rule.") > Of course, keys are not always independent. Suppose the keys are (state, > city). In this case, there are 50 states. But most city names occur in only > one state, so NDV(state) * NDV(city) >> NDV(state, city). Still, we know > that, regardless of the correlation, we can never have more keys than rows. > We can adjust the expression to: > {noformat} > |T.j| = min( ∏ |T.ki|, |T| ) > {noformat} > We can also assume that joins are typically formulated to be as selective as > possible. So the assumption may not be too far off. Still, if a file contains > addresses, so the key is (state, city, street), and we just the (state, city) > prefix, the joint NDV will smaller than the table cardinality. So we need > both terms above. > Adding this to the expression derived above: > {noformat} > |D'| * |M'| > |D' ⋈ M'| = --------------------------------------------- [Equation 6] > max(min(∏ |D.fki|, |D|), min(∏ |M.pki|, |M|)) > {noformat} > h4. Correlated Filters > Suppose that we relax the assumption that filters are not correlated with > join keys and assume they are. For example: > {noformat} > SELECT ... > FROM master m, detail d > WHERE m.pk = d.fk > AND m.pk = 10 > {noformat} > The simplified math used above no longer applies: the filter affects the > number of join keys available. Applying Equation 5 in this case results in a > "double accounting" for the primary key filter. > Equation 5 has four terms. Consider a predicate on the master table: > * The predicate reduces the number of table rows from {{|M|}} to {{|M'|}}. > * The predicate also reduces the number of keys from {{|M.pk| (= |M|)}} to > {{|M.pk'| (= |M'|)}}. > The above equation does not consider the reduction in key cardinality. Let's > add that now: > {noformat} > |D'| * |M'| > |D' ⋈ M'| = --------------------- [Equation 7] > max(|D.fk'|, |M.pk'|) > {noformat} > For the specific example above: > {noformat} > |M.pk'| = |M.pk| * sel(M.pk = 10) > = |M.pk| * 1/|M.pk| > = 1 > |M'| = |M| * sel(M.pk = 10) > = |M| * 1/|M.pk| > = 1 > |D ⋈ M'| = |D| * |M'| / max(|D.fk|, |M.pk'|) > = |D| * 1 / max(|D.fk|, 1) > = |D| / |D.fk| > = |D| / |M| > {noformat} > Which is correct: we'll return a single master record and its details which > is the average number of details per master. The same logic works if we > filter on the detail side: > {noformat} > SELECT ... > FROM master m, detail d > WHERE m.pk = d.fk > AND d.fk = 10 > {noformat} > So: > {noformat} > |D.fk'| = |D.fk| * sel(D.fk = 10| > = |D.fk| * 1/|D.fk| > = 1 > |D'| = |D| * sel(D.fk = 10| > = |D| / |D.fk| > |D' ⋈ M| = |D'| * |M| / max(|D.fk'|, |M.pk|) > = |D| / |D.fk| / * |M| / |M.pk| > = |D| / |M| > {noformat} > It also works if we filter on both sides (which Impala does implicitly): > {noformat} > SELECT ... > FROM master m, detail d > WHERE m.pk = d.fk > AND m.pk = 10 > AND d.fk = 10 > {noformat} > Some of the steps are the same as earlier, let's focus on the join: > {noformat} > |D' ⋈ M'| = |D'| * |M'| / max(|D.fk'|, |M.pk'|) > = (|D| / |D.fk|) * 1 / max(1, 1) > = |D| / |M| > {noformat} > Now, suppose we place a broad filter on the master table: > {noformat} > SELECT ... > FROM master m, detail d > WHERE m.pk = d.fk > AND m.balance > 1234 > {noformat} > Let's apply the same math, assuming {{sel(m.pk > 1234) = 1/3}}. > {noformat} > |M'| = |M| * sel(m.balance > 1234) > = |M| / 3 > |M.pk'| = |M.pk| * sel(m.balance > 1234) > = |M.pk| / 3 > = |M'| > |D ⋈ M'| = |D| * |M'| / max(|D.fk|, |M.pk'|) > = |D| * (|M| / 3) / max(|D.fk|, |M.pk| / 3) > = (|D| / 3) * |M| / |D.fk| > = |D| / 3 > {noformat} > Remembering, again, that {{|D.fk| = |M.pk| = |M|}}. > h4. Compound Keys with Correlated Filters > If the join key is compound (two or more keys), then we apply the selectivity > to either each key as we multiply them, or the the final product: > {noformat} > |T.k'| = ∏ |T.ki| * sel(T) > = sel(T) * ∏ |T.ki| > {noformat} > That is, we assume that the filter reduces the cardinality of all keys > equally. This may require more thought to verify. > The key cardinality is available in metadata. The predicate selectivity is > not currently available. Since the scan computes it to compute the scan > cardinality, the scan node can simply save the selectivity for use by the > join. > Plugging this into Equation 6: > {noformat} > |L'| * |R'| > |L' ⋈ R'| = ---------------------------------------- [Equation 8] > max(sel(L) * ∏ |L.ki|, sel(R) * ∏ |R.ki|) > {noformat} > With that, it is a simple matter to compute the join cardinality. > h4. Generalizing the Correlated Filter Case > A complication then occurs. The next join will have the present join as one > of its input relations. What selectivity should this join report? > We need the following: > * Table cardinalities: {{|L|}} and {{|R|}}, which can be base tables or joins. > * The local predicates for each table, {{sel(L)}} and {{sel(|R|}} along with > their selectivities. > * The NDV values for the join predicates {{|L.k1|}} and {{|R.k2|}}. > From these we can work out the math. As it turns out, the scan nodes already > compute the scan cardinalities for us: {{|L'|}} and {{|R'|}}. The join must > work out the other terms. > The key cardinalities are worked out by applying the scan predicate > selectivity to the join key NDVs: > {noformat} > |T.k'| = |T.k| * sel(T) > |D'| * |M'| > |D' ⋈ M'| = --------------------- > max(|D.fk'|, |M.pk'|) > |D| * |M| > sel(join) = --------- > |D' ⋈ M'| > |D'| = |D| * sel(D) > |M'| = |M| * sel(M) > {noformat} > The top term then becomes: > {noformat} > |D| * |M| = (|D'| / sel(D)) * |M'| / sel(M)) > {noformat} > So the whole equation is: > {noformat} > sel(join) = (|D'| / sel(D)) * |M'| / sel(M)) / |D' ⋈ M'| > {noformat} > Or > {noformat} > |D'| * |M'| > sel(join) = --------------------------- [Equation 9] > |D' ⋈ M'| * sel(D) * sel(M) > {noformat} > All the above terms are available in the planner. The selectivity computed > above then becomes the input to the next join. > h4. M:N (Generic) Case > Join discussions are more intuitive when discussed in the M:1 (FK/PK) case. > But the math works just as well for the M:N (generic) case. > Assume a join of two tables, left ({{L}}) and right ({{R}}), with no > predicate. We have a Cartesian product: > {noformat} > |L ⋈ R| = |L| * |R| > {noformat} > Suppose we have an equi-join predicate: {{L.k1 = R.k2}}. We can now use a > hash join in which L is on the probe side and R is on the build side. > Because we are concerned with the M:N (generic) case, we assume that {{|R.k2| > < |R|}}. This means that multiple build rows have the same key, say {{R.k2 = > x}}. This then implies that each row of the L (probe) side will potentially > match multiple rows on the R (build) side. > We want to know, how many rows will each probe-side row match? > Let’s focus on a table-to-table join and assume we can obtain the following > from HMS: > * Probe and build table cardinalities: {{|L|}} and {{|R|}} > * Key cardinalities (NDVs): {{|L.k1|}} and {{|R.k2|}} > The probe side will match rows on the build side where {{R 𝜎 R.k2 = L.k1}}. > Using the uniformity assumption from the S&S paper (see IMPALA-8014) we can > see that the the values of R.k1 divide the R table into a set of {{|R.k2|}} > groups, the size of each must be: > {noformat} > |R'| = |R| / |R.k2| > {noformat} > Let’s assume that every row on the L side matches some row on the R side. > Then, the join cardinality is just: > {noformat} > |L ⋈ R| = |L| * |R| / |R.k2| > {noformat} > Both the L and R tables may be subject to selection during scan (see > IMPALA-8014) that is an unbiased sampling (given the uniformity assumption) > of the rows of both tables. This reduces the rows available to join, but does > not reduce the population of groups from which the sample is drawn. So: > {noformat} > |L' ⋈ R'| = |L'| * |R'| / |R.k2| > {noformat} > Intuitively, however may rows are scanned, they are still divided into the > same set of groups. > The above assumes that all rows from L match rows in R. (This is called the > “containment assumption” in the S&S paper.) But, Big Data is messy. Perhaps > there are more key values in L than R or visa-versa. We can make some > reasonable assumptions: > * If there are fewer values in L.k1 than in R.k2, we can assume all probe > rows will match a build key. > * If there are more values in L.k1 than in R.k2, we can assume we'll match > all keys on the build side, then discard the extra probe values that don't > match. > Again using the uniformity assumption, the probability is simply the ratio of > the the number of keys available for matching (the right or probe side) > divided by the number of keys we want to match (the left or probe side): > {noformat} > p(match) = / |R.k2| / |L.k1| if |L.k1| > |R.k2|, > \ 1 otherwise > = |R.k2| / max( |L.k1|, |R.k2| ) > {noformat} > Let's check. > * If the ndv's are equal, the probability of a match is 1. > * If either table is empty, the probability is 0. > * If probe keys ({{|L.k1|}}) is half that of build keys ({{|R.k2|}}) then all > probe rows will find a mach, so the probability is 1 (though half of the > build side rows will go unmatched.) > * If we have twice as many probe keys {{|L.k1|}} as build keys ({{|R.k2|}}) > then half probe rows won’t find a match and the probability of a match is 0.5. > All good. > Putting it all together: > {noformat} > |L’ ⋈ R’| = |L’| * |R’| / |R.k2| * p(match) > = (|L’| * |R’| / |R.k2|) * |R.k2| / max(|R.k2|, |L.k1|) > = |L’| * |R’| / max(|R.k2|, |L.k1|) > {noformat} > Rearranging terms, we get the M:N cardinality estimation expression: > {noformat} > |L’| * |R’| > |L’ ⋈ R’| = ------------------- > max(|L.k1|, |R.k2|) > {noformat} > As it turns out, this is exactly Equation 2 in the S&S paper which provides > confirmation that the derivation is correct. It is also the same (except for > names) as Equation 5 above, showing that the M:1 and M:N cases are the same > mathematically. > Said another way, as the M:N groups on the "N" side get ever smaller, they > will converge on a group size of 1, which is the M:1 case. So, the M:1 > (FK/PK) formula *must* be a special case of the more generic M:N (generic) > formula. > h4. Proposed Fix > All of this leads to the proposed fix. To gather the details from the above > discussion, modify the planner as follows: > * Add node selectivity as state on each node that can be obtained by the join > node. > * For the Scan node, retain the selectivity already computed. > * For the Join node, compute selectivity as shown below. > * Modify the Join node to compute join cardinality using the equations below. > We assume the following are available from the input nodes: > * {{|T'|}}, the node output cardinality > * {{sel(T)}}, the selectivity of that node > {noformat} > |L.k'| = min(sel(L) * ∏ |L.ki'|, |L'|) > |R.k'| = min(sel(R) * ∏ |R.ki'|, |R'|) > |L'| * |R'| > |L' ⋈ R'| = ------------------- > max(|L.k'|, |R.k'|) > |L'| * |R'| > sel(join) = --------------------------- > |L' ⋈ R'| * sel(L) * sel(R) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org For additional commands, e-mail: issues-all-h...@impala.apache.org