[ 
https://issues.apache.org/jira/browse/FLINK-14946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang updated FLINK-14946:
-------------------------------
    Description: 
Retractions rule would result in bad plan under some case, I simplify the case 
like the following sql, complete test case could be found in attachments.

{code:scala}
  val join_sql =
      """
        |SELECT
        |  ll.a AS a,
        |  ll.b AS b,
        |  cnt
        |FROM (
        | SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
        |) ll
        |JOIN (
        | SELECT a, b FROM r GROUP BY a, b
        |) rr ON
        |(ll.a = rr.a AND ll.b = rr.b)
      """.stripMargin !image-2019-11-26-14-52-52-824.png! 

    val sqlQuery =
      s"""
         |SELECT a, b_1, SUM(cnt) AS cnt
         |FROM (
         | SELECT *, b AS b_1 FROM (${join_sql})
         |   UNION ALL
         | SELECT *, 'SEA' AS b_1 FROM (${join_sql})
         |) AS total_result
         |GROUP BY a, b_1
      """.stripMargin
{code}

The plan is :
 !image-2019-11-26-14-54-34-797.png! 
After retraction infer, we expect two join node in the above plan has 
`AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 is 
unexpected.

I find the `SetAccModeRule` never apply to Join2 because before actually apply 
`SetAccModeRule` to Join2, HepPlanner would check if the vertex belongs to DAG 
or not, and the result is false. So HepPlanner will not apply `SetAccModeRule` 
to Join2.
 !screenshot-1.png! 

Here is detailed follow-up process: 
1.  Join2 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join 
node (called Join2') with new children which has UpdateAsRetractionTrait with 
true flag
2. New right child of Join2, which is Exchange, matches 
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node (called 
Exchange-right') with new inputs which has UpdateAsRetractionTrait with true 
flag
3. New left child of Join2 matches `SetUpdatesAsRetractionRule`, similar as 
step2, generate an equivalent node called (called Exchange-left') 
4. Join1 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join 
node (called Join1'), similar as step1
5. New right child of Join1, which is Exchange, matches 
`SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new 
inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner 
find digest of new Exchange is same as that  Exchange-right' in step2*, 
HepPlanner will not create new vertex, but reuse the vertex which contain 
Exchange-right'
6. New left child of Join1, which is Exchange, matches 
`SetUpdatesAsRetractionRule`, results in  an equivalent Exchange node with new 
inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner 
find digest of new Exchange is same as that  Exchange-left' in step2*, 
HepPlanner will not create new vertex, but reuse the vertex which contain 
Exchange-left'. Besides, HepPlanner would replace inputs of parent of Exchange, 
(namely Join1) from old Exchange to new Exchange in `contractVertices` methods.
 !screenshot-2.png! 
 !screenshot-3.png! 
In `updateVertex`, Join1' would put newKey and the vertex to 
`mapDigestToVertex`. However Join1' digest is exactly same as Join2' digest.  
Besides, `mapDigestToVertex` already contains same key with Vertex contains 
Join2'.  *So the operation would replace the value of newKey in 
`mapDigestToVertex` from Join2' to Join1'*
7. Join2' matches `SetAccModeRule`, results in an equivalent Join node (called 
Join2'') with AccRetract as AccMode. After apply the rule, HepPlanner starts 
collectGarbage, Join2' would be added in sweepSet because it's not reachable 
from root, so the entry related to Join2' ` would be removed in 
`mapDigestToVertex`.

 !screenshot-4.png! 
 !screenshot-5.png! 
8. Join1' matches `SetAccModeRule`, however HepPlanner think Join1' does not 
belong to DAG because `mapDigestToVertex` does not contain the key of Join1' .
 !screenshot-6.png! 

So Maybe there could be two suggestion
1. Root cause is drawback in HepPlanner. In `collectGarbage`, we could not 
simply remove key from `mapDigestToVertex` for nodes which are not reachable 
from root. Maybe we could check whether the key is same as that which is 
reachable from root.
2. We could also avoid the bug by update `HepMatchOrder` of HepPlanner which 
contains `SetUpdatesAsRetractionRule` from `BOTTOM_UP` to `TOP_DOWN`.

I'm not sure two above solution is good. or is there better solution?

  was:
Retractions rule would result in bad plan under some case, I simplify the case 
like the following sql, complete test case could be found in attachments.

{code:scala}
  val join_sql =
      """
        |SELECT
        |  ll.a AS a,
        |  ll.b AS b,
        |  cnt
        |FROM (
        | SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
        |) ll
        |JOIN (
        | SELECT a, b FROM r GROUP BY a, b
        |) rr ON
        |(ll.a = rr.a AND ll.b = rr.b)
      """.stripMargin !image-2019-11-26-14-52-52-824.png! 

    val sqlQuery =
      s"""
         |SELECT a, b_1, SUM(cnt) AS cnt
         |FROM (
         | SELECT *, b AS b_1 FROM (${join_sql})
         |   UNION ALL
         | SELECT *, 'SEA' AS b_1 FROM (${join_sql})
         |) AS total_result
         |GROUP BY a, b_1
      """.stripMargin
{code}

The plan is :
 !image-2019-11-26-14-54-34-797.png! 
After retraction infer, we expect two join node in the above plan has 
`AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 is 
unexpected.

I find  in HepPlanner, before actually apply `SetAccModeRule` to Join2, 
HepPlanner would check if the vertex belongs to dag or not, and the result is 
false. So `SetAccModeRule` does not actually apply to Join2.
 !screenshot-1.png! 


> Retraction infer would result in bad plan under corner case in blink planner
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-14946
>                 URL: https://issues.apache.org/jira/browse/FLINK-14946
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0, 1.9.1
>            Reporter: Jing Zhang
>            Priority: Major
>         Attachments: RetractionRules1Test.scala, 
> image-2019-11-26-14-54-34-797.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png, screenshot-4.png, screenshot-5.png, screenshot-6.png
>
>
> Retractions rule would result in bad plan under some case, I simplify the 
> case like the following sql, complete test case could be found in attachments.
> {code:scala}
>   val join_sql =
>       """
>         |SELECT
>         |  ll.a AS a,
>         |  ll.b AS b,
>         |  cnt
>         |FROM (
>         | SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
>         |) ll
>         |JOIN (
>         | SELECT a, b FROM r GROUP BY a, b
>         |) rr ON
>         |(ll.a = rr.a AND ll.b = rr.b)
>       """.stripMargin !image-2019-11-26-14-52-52-824.png! 
>     val sqlQuery =
>       s"""
>          |SELECT a, b_1, SUM(cnt) AS cnt
>          |FROM (
>          | SELECT *, b AS b_1 FROM (${join_sql})
>          |   UNION ALL
>          | SELECT *, 'SEA' AS b_1 FROM (${join_sql})
>          |) AS total_result
>          |GROUP BY a, b_1
>       """.stripMargin
> {code}
> The plan is :
>  !image-2019-11-26-14-54-34-797.png! 
> After retraction infer, we expect two join node in the above plan has 
> `AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 
> is unexpected.
> I find the `SetAccModeRule` never apply to Join2 because before actually 
> apply `SetAccModeRule` to Join2, HepPlanner would check if the vertex belongs 
> to DAG or not, and the result is false. So HepPlanner will not apply 
> `SetAccModeRule` to Join2.
>  !screenshot-1.png! 
> Here is detailed follow-up process: 
> 1.  Join2 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join 
> node (called Join2') with new children which has UpdateAsRetractionTrait with 
> true flag
> 2. New right child of Join2, which is Exchange, matches 
> `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node (called 
> Exchange-right') with new inputs which has UpdateAsRetractionTrait with true 
> flag
> 3. New left child of Join2 matches `SetUpdatesAsRetractionRule`, similar as 
> step2, generate an equivalent node called (called Exchange-left') 
> 4. Join1 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join 
> node (called Join1'), similar as step1
> 5. New right child of Join1, which is Exchange, matches 
> `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new 
> inputs which has UpdateAsRetractionTrait with true flag, *however, HepPlanner 
> find digest of new Exchange is same as that  Exchange-right' in step2*, 
> HepPlanner will not create new vertex, but reuse the vertex which contain 
> Exchange-right'
> 6. New left child of Join1, which is Exchange, matches 
> `SetUpdatesAsRetractionRule`, results in  an equivalent Exchange node with 
> new inputs which has UpdateAsRetractionTrait with true flag, *however, 
> HepPlanner find digest of new Exchange is same as that  Exchange-left' in 
> step2*, HepPlanner will not create new vertex, but reuse the vertex which 
> contain Exchange-left'. Besides, HepPlanner would replace inputs of parent of 
> Exchange, (namely Join1) from old Exchange to new Exchange in 
> `contractVertices` methods.
>  !screenshot-2.png! 
>  !screenshot-3.png! 
> In `updateVertex`, Join1' would put newKey and the vertex to 
> `mapDigestToVertex`. However Join1' digest is exactly same as Join2' digest.  
> Besides, `mapDigestToVertex` already contains same key with Vertex contains 
> Join2'.  *So the operation would replace the value of newKey in 
> `mapDigestToVertex` from Join2' to Join1'*
> 7. Join2' matches `SetAccModeRule`, results in an equivalent Join node 
> (called Join2'') with AccRetract as AccMode. After apply the rule, HepPlanner 
> starts collectGarbage, Join2' would be added in sweepSet because it's not 
> reachable from root, so the entry related to Join2' ` would be removed in 
> `mapDigestToVertex`.
>  !screenshot-4.png! 
>  !screenshot-5.png! 
> 8. Join1' matches `SetAccModeRule`, however HepPlanner think Join1' does not 
> belong to DAG because `mapDigestToVertex` does not contain the key of Join1' .
>  !screenshot-6.png! 
> So Maybe there could be two suggestion
> 1. Root cause is drawback in HepPlanner. In `collectGarbage`, we could not 
> simply remove key from `mapDigestToVertex` for nodes which are not reachable 
> from root. Maybe we could check whether the key is same as that which is 
> reachable from root.
> 2. We could also avoid the bug by update `HepMatchOrder` of HepPlanner which 
> contains `SetUpdatesAsRetractionRule` from `BOTTOM_UP` to `TOP_DOWN`.
> I'm not sure two above solution is good. or is there better solution?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to