[ 
https://issues.apache.org/jira/browse/FLINK-21553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294981#comment-17294981
 ] 

Andy commented on FLINK-21553:
------------------------------

[~jark] [~dwysakowicz] [~guoweima] Now when flush buffer to state, 
`CombineRecordsFunction` only copy window key because window key is reused. 
However, it forgets to copy record which is also reused. I think it's the root 
cause of the failure case. For above failed case, there exists `count(distinct 
distinctKey)` in sql,  distinctKey is the UK of MapState. If pushed object 
directly to  state when flush buffer to state, it may be updated after it is 
pushed into HeapStateBackend because it is a reused object in 
`AbstractBytesMultiMap`.

 

 
{code:java}
//代码占位符
@Test
def testHopWindow_Cube(): Unit = {
  System.setProperty("org.codehaus.janino.source_debugging.enable", "true")
  System.setProperty("org.codehaus.janino.source_debugging.dir",
    
"/Users/zhangjing/IdeaProjects/flink/flink-table/flink-table-planner-blink/src/main/java")
  val inputData: Seq[Row] = List(
    row("2020-10-10 00:00:01", "Hi", "a"),
    row("2020-10-10 00:00:03", "Comment#1", "a"),
    row("2020-10-10 00:00:04", null, "a"),

    row("2020-10-10 00:00:07", "Hello", "b"),
    row("2020-10-10 00:00:06", "Hi", "b"), // out of order
    row("2020-10-10 00:00:08", "Comment#2", "a")
  )
  val dataId = TestValuesTableFactory.registerData(inputData)
  tEnv.executeSql(
    s"""
       |CREATE TABLE T2 (
       | `ts` STRING,
       | `string` STRING,
       | `name` STRING,
       | `rowtime` AS TO_TIMESTAMP(`ts`),
       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
       |) WITH (
       | 'connector' = 'values',
       | 'data-id' = '$dataId',
       | 'failing-source' = 'true'
       |)
       |""".stripMargin)
  val sql =
    """
      |SELECT
      |  GROUPING_ID(`name`),
      |  `name`,
      |  window_start,
      |  window_end,
      |  COUNT(DISTINCT `string`)
      |FROM TABLE(
      |   HOP(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' 
SECOND))
      |GROUP BY CUBE(`name`), window_start, window_end
    """.stripMargin

  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
  env.execute()

  val data = Seq(
    "0,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2",
    "0,a,2020-10-10T00:00,2020-10-10T00:00:10,3",
    "0,a,2020-10-10T00:00:05,2020-10-10T00:00:15,1",
    "0,b,2020-10-10T00:00,2020-10-10T00:00:10,2",
    "0,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2",
    "1,null,2020-10-09T23:59:55,2020-10-10T00:00:05,2",
    "1,null,2020-10-10T00:00,2020-10-10T00:00:10,4",
    "1,null,2020-10-10T00:00:05,2020-10-10T00:00:15,3"
  )
  assertEquals(
    data.sorted.mkString("\n"),
    sink.getAppendResults.sorted.mkString("\n"))
}
{code}
 

I simplify the failure case as above case, which could fails frequently (1 per 
3~4 times based on HeapStateBackend + SplitDistinct: false) on my local 
machine. The state in HeapStateBackend is following pic1 when test pass, while 
state are pic2 or pic3 when test failure.
 
!image-2021-03-04-12-05-59-802.png|width=1308,height=292!
{code:java}
 {code}
!image-2021-03-04-12-07-53-566.png|width=1420,height=361!
!image-2021-03-04-12-08-07-097.png|width=1297,height=211!
 
After copy record, the case could always be passed.

> WindowDistinctAggregateITCase#testHopWindow_Cube is unstable
> ------------------------------------------------------------
>
>                 Key: FLINK-21553
>                 URL: https://issues.apache.org/jira/browse/FLINK-21553
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Assignee: Andy
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.13.0
>
>         Attachments: image-2021-03-04-12-05-59-802.png, 
> image-2021-03-04-12-07-53-566.png, image-2021-03-04-12-08-07-097.png, 
> screenshot-1.png
>
>
> See 
> https://dev.azure.com/imjark/Flink/_build/results?buildId=422&view=logs&j=d1352042-8a7d-50b6-3946-a85d176b7981&t=b2322052-d503-5552-81e2-b3a532a1d7e8
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to