[
https://issues.apache.org/jira/browse/FLINK-19926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350250#comment-17350250
]
Shuo Cheng edited comment on FLINK-19926 at 5/24/21, 5:56 AM:
--------------------------------------------------------------
Hi, [~satyamshekhar], I've test the case you present above in Flink 1.11.1, and
the result looks correct. Add the following code to `
org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase` to
reproduce. I think there is something else affects the result in your case,
Could you provide a complete and runnable test example?
{code:java}
@Before
def setup(): Unit = {
val data = List(
rowOf(0L, LocalDateTime.parse("1970-01-01T00:00:00")),
rowOf(1L, LocalDateTime.parse("1970-01-02T00:00:00"))
)
val dataId = TestValuesTableFactory.registerData(data)
tEnv.executeSql(
s"""
|create table T0(
| amount bigint,
| rowtime timestamp(3),
| watermark for rowtime as rowtime
|) with (
| 'connector' = 'values',
| 'data-id' = '$dataId'
|)
|""".stripMargin)
}
@Test
def testCascadingTumbleWindow1(): Unit = {
val innerSql =
"""
|create view CTE as
|SELECT sum(`amount`) as _output, tumble_end(rowtime, interval '1'
second) _dim0
|FROM T0
|GROUP BY TUMBLE(rowtime, INTERVAL '1' SECOND)
|""".stripMargin
tEnv.executeSql(innerSql)
val sql =
"""
|SELECT V0._output as V0_output, V1._output AS V1_output,
| V0._dim0 as V0_time, V1._dim0 as V1_time
| FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0
|""".stripMargin
val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()
val expected = Seq(
"0,0,1970-01-01T00:00:01,1970-01-01T00:00:01",
"1,1,1970-01-02T00:00:01,1970-01-02T00:00:01")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
{code}
was (Author: icshuo):
Hi, [~satyamshekhar], I've test the case you present above, and the result
looks correct. Add the following code to `
org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase` to
reproduce. I think there is something else affects the result in your case,
Could you provide a complete and runnable test example?
{code:java}
@Before
def setup(): Unit = {
val data = List(
rowOf(0L, LocalDateTime.parse("1970-01-01T00:00:00")),
rowOf(1L, LocalDateTime.parse("1970-01-02T00:00:00"))
)
val dataId = TestValuesTableFactory.registerData(data)
tEnv.executeSql(
s"""
|create table T0(
| amount bigint,
| rowtime timestamp(3),
| watermark for rowtime as rowtime
|) with (
| 'connector' = 'values',
| 'data-id' = '$dataId'
|)
|""".stripMargin)
}
@Test
def testCascadingTumbleWindow1(): Unit = {
val innerSql =
"""
|create view CTE as
|SELECT sum(`amount`) as _output, tumble_end(rowtime, interval '1'
second) _dim0
|FROM T0
|GROUP BY TUMBLE(rowtime, INTERVAL '1' SECOND)
|""".stripMargin
tEnv.executeSql(innerSql)
val sql =
"""
|SELECT V0._output as V0_output, V1._output AS V1_output,
| V0._dim0 as V0_time, V1._dim0 as V1_time
| FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0
|""".stripMargin
val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()
val expected = Seq(
"0,0,1970-01-01T00:00:01,1970-01-01T00:00:01",
"1,1,1970-01-02T00:00:01,1970-01-02T00:00:01")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
{code}
> Wrong results for join post tumble grouping
> -------------------------------------------
>
> Key: FLINK-19926
> URL: https://issues.apache.org/jira/browse/FLINK-19926
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.1
> Environment: Flink version: 1.11.1
> Reporter: Satyam Shekhar
> Priority: Minor
> Labels: auto-deprioritized-major
>
> I have a table T0 with the following schema -
> {code:java}
> root
> |-- amount: BIGINT
> |-- timestamp: TIMESTAMP(3)
> {code}
>
> The table T0 has two rows -
> |amount|timestamp|
> |0|0|
> |1|86400000|
>
> The following query with tumble grouping returns the wrong result -
> {code:java}
> WITH CTE AS
> (SELECT SUM(amount) AS _output,
> TUMBLE_END(`timestamp`, INTERVAL '1' SECOND) AS _dim0
> FROM T0 GROUP BY TUMBLE(`timestamp`, INTERVAL '1' SECOND))
> SELECT V0._output as V0_output, V1._output AS V1_output,
> V0._dim0 as V0_time, V1._dim0 as V1_time
> FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0
> {code}
>
> The returned result is -
> |V0_output|V1_output|V0_time|V1_time|
> |1|1|86401000|86401000|
>
> The expected result is -
> |V0_output|V1_output|V0_time|V1_time|
> |0|0|1000|1000|
> |1|1|86401000|86401000|
>
> Running subquery for `CTE` returns the correct result -
> {code:java}
> SELECT SUM(amount) AS _output,
> TUMBLE_END(`timestamp`, INTERVAL '1' SECOND) AS _dim0
> FROM T0 GROUP BY TUMBLE(`timestamp`, INTERVAL '1' SECOND)
> {code}
>
> Result (this is correct) -
> |_output|_dim0|
> |0|1000|
> |1|86401000|
--
This message was sent by Atlassian Jira
(v8.3.4#803005)