[
https://issues.apache.org/jira/browse/FLINK-14946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jing Zhang updated FLINK-14946:
-------------------------------
Description:
Retractions rule would result in bad plan under some case, I simplify the case
like the following sql, complete test case could be found in attachments.
{code:scala}
val join_sql =
"""
|SELECT
| ll.a AS a,
| ll.b AS b,
| cnt
|FROM (
| SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
|) ll
|JOIN (
| SELECT a, b FROM r GROUP BY a, b
|) rr ON
|(ll.a = rr.a AND ll.b = rr.b)
""".stripMargin !image-2019-11-26-14-52-52-824.png!
val sqlQuery =
s"""
|SELECT a, b_1, SUM(cnt) AS cnt
|FROM (
| SELECT *, b AS b_1 FROM (${join_sql})
| UNION ALL
| SELECT *, 'SEA' AS b_1 FROM (${join_sql})
|) AS total_result
|GROUP BY a, b_1
""".stripMargin
{code}
The plan is :
!image-2019-11-26-14-54-34-797.png!
After retraction infer, we expect two join node in the above plan has
`AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 is
unexpected.
I find the `SetAccModeRule` never apply to Join2 because before actually apply
`SetAccModeRule` to Join2, HepPlanner would check if the vertex belongs to DAG
or not, and the result is false. So HepPlanner will not apply `SetAccModeRule`
to Join2.
!screenshot-1.png!
----
Here is detailed follow-up process:
1. Join2 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join
node (called Join2') with new children which has UpdateAsRetractionTrait with
true flag
2. New right child of Join2, which is Exchange, matches
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node (called
Exchange-right') with new inputs which has UpdateAsRetractionTrait with true
flag
3. New left child of Join2 matches `SetUpdatesAsRetractionRule`, similar as
step2, generate an equivalent node called (called Exchange-left')
4. Join1 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join
node (called Join1'), similar as step1
5. New right child of Join1, which is Exchange, matches
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new
inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner
find digest of new Exchange is same as that Exchange-right' in step2*,
HepPlanner will not create new vertex, but reuse the vertex which contain
Exchange-right'
6. New left child of Join1, which is Exchange, matches
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new
inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner
find digest of new Exchange is same as that Exchange-left' in step3*,
HepPlanner will not create new vertex, but reuse the vertex which contain
Exchange-left'. Besides, HepPlanner would replace inputs of parent of Exchange,
(namely Join1) from old Exchange to new Exchange in `contractVertices` methods.
!screenshot-2.png!
!screenshot-3.png!
In `updateVertex`, Join1' would put newKey and the vertex to
`mapDigestToVertex`. However Join1' digest is exactly same as Join2' digest.
Besides, `mapDigestToVertex` already contains same key with Vertex contains
Join2'. *So the operation would replace the value of newKey in
`mapDigestToVertex` from Join2' to Join1'*
7. Join1' matches `SetAccModeRule`, results in an equivalent Join node (called
Join1'') with AccRetract as AccMode. After apply the rule, HepPlanner starts
collectGarbage, Join1' would be added in sweepSet because it's not reachable
from root, so the entry related to Join1' would be removed in
`mapDigestToVertex`.
!screenshot-4.png!
!screenshot-5.png!
8. Join2' matches `SetAccModeRule`, however HepPlanner think Join2' does not
belong to DAG because `mapDigestToVertex` does not contain the key of Join2' .
!screenshot-6.png!
----
So Maybe there could be two suggestion
1. Root cause is drawback in HepPlanner. In `collectGarbage`, we could not
simply remove key from `mapDigestToVertex` for nodes which are not reachable
from root. Maybe we could check whether the key is same as that which is
reachable from root.
2. We could also avoid the bug by update `HepMatchOrder` of HepPlanner which
contains `SetUpdatesAsRetractionRule` from `BOTTOM_UP` to `TOP_DOWN`.
I'm not sure two above solution is reasonable. or is there better solution?
was:
Retractions rule would result in bad plan under some case, I simplify the case
like the following sql, complete test case could be found in attachments.
{code:scala}
val join_sql =
"""
|SELECT
| ll.a AS a,
| ll.b AS b,
| cnt
|FROM (
| SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
|) ll
|JOIN (
| SELECT a, b FROM r GROUP BY a, b
|) rr ON
|(ll.a = rr.a AND ll.b = rr.b)
""".stripMargin !image-2019-11-26-14-52-52-824.png!
val sqlQuery =
s"""
|SELECT a, b_1, SUM(cnt) AS cnt
|FROM (
| SELECT *, b AS b_1 FROM (${join_sql})
| UNION ALL
| SELECT *, 'SEA' AS b_1 FROM (${join_sql})
|) AS total_result
|GROUP BY a, b_1
""".stripMargin
{code}
The plan is :
!image-2019-11-26-14-54-34-797.png!
After retraction infer, we expect two join node in the above plan has
`AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 is
unexpected.
I find the `SetAccModeRule` never apply to Join2 because before actually apply
`SetAccModeRule` to Join2, HepPlanner would check if the vertex belongs to DAG
or not, and the result is false. So HepPlanner will not apply `SetAccModeRule`
to Join2.
!screenshot-1.png!
----
Here is detailed follow-up process:
1. Join2 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join
node (called Join2') with new children which has UpdateAsRetractionTrait with
true flag
2. New right child of Join2, which is Exchange, matches
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node (called
Exchange-right') with new inputs which has UpdateAsRetractionTrait with true
flag
3. New left child of Join2 matches `SetUpdatesAsRetractionRule`, similar as
step2, generate an equivalent node called (called Exchange-left')
4. Join1 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join
node (called Join1'), similar as step1
5. New right child of Join1, which is Exchange, matches
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new
inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner
find digest of new Exchange is same as that Exchange-right' in step2*,
HepPlanner will not create new vertex, but reuse the vertex which contain
Exchange-right'
6. New left child of Join1, which is Exchange, matches
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new
inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner
find digest of new Exchange is same as that Exchange-left' in step2*,
HepPlanner will not create new vertex, but reuse the vertex which contain
Exchange-left'. Besides, HepPlanner would replace inputs of parent of Exchange,
(namely Join1) from old Exchange to new Exchange in `contractVertices` methods.
!screenshot-2.png!
!screenshot-3.png!
In `updateVertex`, Join1' would put newKey and the vertex to
`mapDigestToVertex`. However Join1' digest is exactly same as Join2' digest.
Besides, `mapDigestToVertex` already contains same key with Vertex contains
Join2'. *So the operation would replace the value of newKey in
`mapDigestToVertex` from Join2' to Join1'*
7. Join1' matches `SetAccModeRule`, results in an equivalent Join node (called
Join1'') with AccRetract as AccMode. After apply the rule, HepPlanner starts
collectGarbage, Join1' would be added in sweepSet because it's not reachable
from root, so the entry related to Join1' would be removed in
`mapDigestToVertex`.
!screenshot-4.png!
!screenshot-5.png!
8. Join2' matches `SetAccModeRule`, however HepPlanner think Join2' does not
belong to DAG because `mapDigestToVertex` does not contain the key of Join2' .
!screenshot-6.png!
----
So Maybe there could be two suggestion
1. Root cause is drawback in HepPlanner. In `collectGarbage`, we could not
simply remove key from `mapDigestToVertex` for nodes which are not reachable
from root. Maybe we could check whether the key is same as that which is
reachable from root.
2. We could also avoid the bug by update `HepMatchOrder` of HepPlanner which
contains `SetUpdatesAsRetractionRule` from `BOTTOM_UP` to `TOP_DOWN`.
I'm not sure two above solution is reasonable. or is there better solution?
> Retraction infer would result in bad plan under corner case in blink planner
> ----------------------------------------------------------------------------
>
> Key: FLINK-14946
> URL: https://issues.apache.org/jira/browse/FLINK-14946
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0, 1.9.1
> Reporter: Jing Zhang
> Priority: Major
> Attachments: RetractionRules1Test.scala,
> image-2019-11-26-14-54-34-797.png, screenshot-1.png, screenshot-2.png,
> screenshot-3.png, screenshot-4.png, screenshot-5.png, screenshot-6.png
>
>
> Retractions rule would result in bad plan under some case, I simplify the
> case like the following sql, complete test case could be found in attachments.
> {code:scala}
> val join_sql =
> """
> |SELECT
> | ll.a AS a,
> | ll.b AS b,
> | cnt
> |FROM (
> | SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
> |) ll
> |JOIN (
> | SELECT a, b FROM r GROUP BY a, b
> |) rr ON
> |(ll.a = rr.a AND ll.b = rr.b)
> """.stripMargin !image-2019-11-26-14-52-52-824.png!
> val sqlQuery =
> s"""
> |SELECT a, b_1, SUM(cnt) AS cnt
> |FROM (
> | SELECT *, b AS b_1 FROM (${join_sql})
> | UNION ALL
> | SELECT *, 'SEA' AS b_1 FROM (${join_sql})
> |) AS total_result
> |GROUP BY a, b_1
> """.stripMargin
> {code}
> The plan is :
> !image-2019-11-26-14-54-34-797.png!
> After retraction infer, we expect two join node in the above plan has
> `AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2
> is unexpected.
> I find the `SetAccModeRule` never apply to Join2 because before actually
> apply `SetAccModeRule` to Join2, HepPlanner would check if the vertex belongs
> to DAG or not, and the result is false. So HepPlanner will not apply
> `SetAccModeRule` to Join2.
> !screenshot-1.png!
> ----
> Here is detailed follow-up process:
> 1. Join2 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join
> node (called Join2') with new children which has UpdateAsRetractionTrait with
> true flag
> 2. New right child of Join2, which is Exchange, matches
> `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node (called
> Exchange-right') with new inputs which has UpdateAsRetractionTrait with true
> flag
> 3. New left child of Join2 matches `SetUpdatesAsRetractionRule`, similar as
> step2, generate an equivalent node called (called Exchange-left')
> 4. Join1 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join
> node (called Join1'), similar as step1
> 5. New right child of Join1, which is Exchange, matches
> `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new
> inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner
> find digest of new Exchange is same as that Exchange-right' in step2*,
> HepPlanner will not create new vertex, but reuse the vertex which contain
> Exchange-right'
> 6. New left child of Join1, which is Exchange, matches
> `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with
> new inputs which has UpdateAsRetractionTrait with true flag, *however,
> HepPlanner find digest of new Exchange is same as that Exchange-left' in
> step3*, HepPlanner will not create new vertex, but reuse the vertex which
> contain Exchange-left'. Besides, HepPlanner would replace inputs of parent of
> Exchange, (namely Join1) from old Exchange to new Exchange in
> `contractVertices` methods.
> !screenshot-2.png!
> !screenshot-3.png!
> In `updateVertex`, Join1' would put newKey and the vertex to
> `mapDigestToVertex`. However Join1' digest is exactly same as Join2' digest.
> Besides, `mapDigestToVertex` already contains same key with Vertex contains
> Join2'. *So the operation would replace the value of newKey in
> `mapDigestToVertex` from Join2' to Join1'*
> 7. Join1' matches `SetAccModeRule`, results in an equivalent Join node
> (called Join1'') with AccRetract as AccMode. After apply the rule, HepPlanner
> starts collectGarbage, Join1' would be added in sweepSet because it's not
> reachable from root, so the entry related to Join1' would be removed in
> `mapDigestToVertex`.
> !screenshot-4.png!
> !screenshot-5.png!
> 8. Join2' matches `SetAccModeRule`, however HepPlanner think Join2' does not
> belong to DAG because `mapDigestToVertex` does not contain the key of Join2' .
> !screenshot-6.png!
> ----
> So Maybe there could be two suggestion
> 1. Root cause is drawback in HepPlanner. In `collectGarbage`, we could not
> simply remove key from `mapDigestToVertex` for nodes which are not reachable
> from root. Maybe we could check whether the key is same as that which is
> reachable from root.
> 2. We could also avoid the bug by update `HepMatchOrder` of HepPlanner which
> contains `SetUpdatesAsRetractionRule` from `BOTTOM_UP` to `TOP_DOWN`.
> I'm not sure two above solution is reasonable. or is there better solution?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)