[
https://issues.apache.org/jira/browse/FLINK-21553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294981#comment-17294981
]
Andy commented on FLINK-21553:
------------------------------
[~jark] [~dwysakowicz] [~guoweima] Now when flush buffer to state,
`CombineRecordsFunction` only copy window key because window key is reused.
However, it forgets to copy record which is also reused. I think it's the root
cause of the failure case. For above failed case, there exists `count(distinct
distinctKey)` in sql, distinctKey is the UK of MapState. If pushed object
directly to state when flush buffer to state, it may be updated after it is
pushed into HeapStateBackend because it is a reused object in
`AbstractBytesMultiMap`.
{code:java}
//代码占位符
@Test
def testHopWindow_Cube(): Unit = {
System.setProperty("org.codehaus.janino.source_debugging.enable", "true")
System.setProperty("org.codehaus.janino.source_debugging.dir",
"/Users/zhangjing/IdeaProjects/flink/flink-table/flink-table-planner-blink/src/main/java")
val inputData: Seq[Row] = List(
row("2020-10-10 00:00:01", "Hi", "a"),
row("2020-10-10 00:00:03", "Comment#1", "a"),
row("2020-10-10 00:00:04", null, "a"),
row("2020-10-10 00:00:07", "Hello", "b"),
row("2020-10-10 00:00:06", "Hi", "b"), // out of order
row("2020-10-10 00:00:08", "Comment#2", "a")
)
val dataId = TestValuesTableFactory.registerData(inputData)
tEnv.executeSql(
s"""
|CREATE TABLE T2 (
| `ts` STRING,
| `string` STRING,
| `name` STRING,
| `rowtime` AS TO_TIMESTAMP(`ts`),
| WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'failing-source' = 'true'
|)
|""".stripMargin)
val sql =
"""
|SELECT
| GROUPING_ID(`name`),
| `name`,
| window_start,
| window_end,
| COUNT(DISTINCT `string`)
|FROM TABLE(
| HOP(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10'
SECOND))
|GROUP BY CUBE(`name`), window_start, window_end
""".stripMargin
val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()
val data = Seq(
"0,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2",
"0,a,2020-10-10T00:00,2020-10-10T00:00:10,3",
"0,a,2020-10-10T00:00:05,2020-10-10T00:00:15,1",
"0,b,2020-10-10T00:00,2020-10-10T00:00:10,2",
"0,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2",
"1,null,2020-10-09T23:59:55,2020-10-10T00:00:05,2",
"1,null,2020-10-10T00:00,2020-10-10T00:00:10,4",
"1,null,2020-10-10T00:00:05,2020-10-10T00:00:15,3"
)
assertEquals(
data.sorted.mkString("\n"),
sink.getAppendResults.sorted.mkString("\n"))
}
{code}
I simplify the failure case as above case, which could fails frequently (1 per
3~4 times based on HeapStateBackend + SplitDistinct: false) on my local
machine. The state in HeapStateBackend is following pic1 when test pass, while
state are pic2 or pic3 when test failure.
!image-2021-03-04-12-05-59-802.png|width=1308,height=292!
{code:java}
{code}
!image-2021-03-04-12-07-53-566.png|width=1420,height=361!
!image-2021-03-04-12-08-07-097.png|width=1297,height=211!
After copy record, the case could always be passed.
> WindowDistinctAggregateITCase#testHopWindow_Cube is unstable
> ------------------------------------------------------------
>
> Key: FLINK-21553
> URL: https://issues.apache.org/jira/browse/FLINK-21553
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Jark Wu
> Assignee: Andy
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.13.0
>
> Attachments: image-2021-03-04-12-05-59-802.png,
> image-2021-03-04-12-07-53-566.png, image-2021-03-04-12-08-07-097.png,
> screenshot-1.png
>
>
> See
> https://dev.azure.com/imjark/Flink/_build/results?buildId=422&view=logs&j=d1352042-8a7d-50b6-3946-a85d176b7981&t=b2322052-d503-5552-81e2-b3a532a1d7e8
> !screenshot-1.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)