[
https://issues.apache.org/jira/browse/FLINK-15421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Benchao Li updated FLINK-15421:
-------------------------------
Description:
`TimestmapType` has two types of physical representation: `Timestamp` and
`LocalDateTime`. When we use following SQL, it will conflict each other:
{quote}SELECT
SUM(cnt) as s,
MAX(ts)
FROM
SELECT
`string`,
`int`,
COUNT * AS cnt,
MAX(rowtime) as ts
FROM T1
GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
GROUP BY `string`
{quote}
with 'table.exec.emit.early-fire.enabled' = true.
The exceptions is below:
{quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot
be cast to java.sql.Timestamp
at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
{quote}
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
@Test
def testEarlyFireWithTumblingWindow(): Unit = {
val stream = failingDataSource(data)
.assignTimestampsAndWatermarks(
new TimestampAndWatermarkWithOffset
[(Long, Int, Double, Float, BigDecimal, String, String)](10L))
val table = stream.toTable(tEnv,
'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
tEnv.registerTable("T1", table)
tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
true)
tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay",
"1000 ms")
val sql =
"""
|SELECT
| SUM(cnt) as s,
| MAX(ts)
|FROM
| (SELECT
| `string`,
| `int`,
| COUNT(*) AS cnt,
| MAX(rowtime) as ts
| FROM T1
| GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
|GROUP BY `string`
|""".stripMargin
tEnv.sqlQuery(sql).toRetractStream[Row].print()
env.execute()
}
was:
`TimestmapType` has two types of physical representation: `Timestamp` and
`LocalDateTime`. When we use following SQL, it will conflict each other:
{quote}SELECT
SUM(cnt) as s,
MAX(ts)
FROM
SELECT
`string`,
`int`,
COUNT * AS cnt,
MAX(rowtime) as ts
FROM T1
GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
GROUP BY `string`
{quote}
with 'table.exec.emit.early-fire.enabled' = true.
The exceptions is below:
{quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot
be cast to java.sql.Timestamp
at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
{quote}
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
{{@Test}}
{{ def testEarlyFireWithTumblingWindow(): Unit = {}}
{{ val stream = failingDataSource(data)}}
{{ .assignTimestampsAndWatermarks(}}
{{ new TimestampAndWatermarkWithOffset}}
{{ [(Long, Int, Double, Float, BigDecimal, String, String)](10L))}}
{{ val table = stream.toTable(tEnv,}}
{{ 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)}}
{{ tEnv.registerTable("T1", table)}}
{{
tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
true)}}
{{
tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay",
"1000 ms")}}{{val sql =}}
{{ """}}
{{ |SELECT}}
{{ | SUM(cnt) as s,}}
{{ | MAX(ts)}}
{{ |FROM}}
{{ | (SELECT}}
{{ | `string`,}}
{{ | `int`,}}
{{ | COUNT(*) AS cnt,}}
{{ | MAX(rowtime) as ts}}
{{ | FROM T1}}
{{ | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))}}
{{ |GROUP BY `string`}}
{{ |""".stripMargin}}{{tEnv.sqlQuery(sql).toRetractStream[Row].print()}}
{{ env.execute()}}
{{ }}}
> GroupAggsHandler throws java.time.LocalDateTime cannot be cast to
> java.sql.Timestamp
> ------------------------------------------------------------------------------------
>
> Key: FLINK-15421
> URL: https://issues.apache.org/jira/browse/FLINK-15421
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.9.1, 1.10.0
> Reporter: Benchao Li
> Priority: Major
>
> `TimestmapType` has two types of physical representation: `Timestamp` and
> `LocalDateTime`. When we use following SQL, it will conflict each other:
> {quote}SELECT
> SUM(cnt) as s,
> MAX(ts)
> FROM
> SELECT
> `string`,
> `int`,
> COUNT * AS cnt,
> MAX(rowtime) as ts
> FROM T1
> GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
> GROUP BY `string`
> {quote}
> with 'table.exec.emit.early-fire.enabled' = true.
> The exceptions is below:
> {quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime
> cannot be cast to java.sql.Timestamp
> at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
> at java.lang.Thread.run(Thread.java:748)
> {quote}
> I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
> @Test
> def testEarlyFireWithTumblingWindow(): Unit = {
> val stream = failingDataSource(data)
> .assignTimestampsAndWatermarks(
> new TimestampAndWatermarkWithOffset
> [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
> val table = stream.toTable(tEnv,
> 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
> tEnv.registerTable("T1", table)
>
> tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
> true)
>
> tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay",
> "1000 ms")
> val sql =
> """
> |SELECT
> | SUM(cnt) as s,
> | MAX(ts)
> |FROM
> | (SELECT
> | `string`,
> | `int`,
> | COUNT(*) AS cnt,
> | MAX(rowtime) as ts
> | FROM T1
> | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
> |GROUP BY `string`
> |""".stripMargin
> tEnv.sqlQuery(sql).toRetractStream[Row].print()
> env.execute()
> }
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)