[ 
https://issues.apache.org/jira/browse/FLINK-15421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li updated FLINK-15421:
-------------------------------
    Description: 
`TimestmapType` has two types of physical representation: `Timestamp` and 
`LocalDateTime`. When we use following SQL, it will conflict each other:
{quote}SELECT 
 SUM(cnt) as s, 
 MAX(ts)
 FROM 
 SELECT 
 `string`,
 `int`,
 COUNT * AS cnt,
 MAX(rowtime) as ts
 FROM T1
 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
 GROUP BY `string`
{quote}
with 'table.exec.emit.early-fire.enabled' = true.

The exceptions is below:
{quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot 
be cast to java.sql.Timestamp
 at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
 at java.lang.Thread.run(Thread.java:748)
{quote}
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:

 

@Test
 def testEarlyFireWithTumblingWindow(): Unit = {
 val stream = failingDataSource(data)
 .assignTimestampsAndWatermarks(
 new TimestampAndWatermarkWithOffset
 [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
 val table = stream.toTable(tEnv,
 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
 tEnv.registerTable("T1", table)
 
tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
 true)
 tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", 
"1000 ms")

val sql =
 """
 |SELECT
 | SUM(cnt) as s,
 | MAX(ts)
 |FROM
 | (SELECT
 | `string`,
 | `int`,
 | COUNT(*) AS cnt,
 | MAX(rowtime) as ts
 | FROM T1
 | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
 |GROUP BY `string`
 |""".stripMargin

tEnv.sqlQuery(sql).toRetractStream[Row].print()
 env.execute()
 }

 
  

  was:
`TimestmapType` has two types of physical representation: `Timestamp` and 
`LocalDateTime`. When we use following SQL, it will conflict each other:
{quote}SELECT 
 SUM(cnt) as s, 
 MAX(ts)
 FROM 
 SELECT 
 `string`,
 `int`,
 COUNT * AS cnt,
 MAX(rowtime) as ts
 FROM T1
 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
 GROUP BY `string`
{quote}
with 'table.exec.emit.early-fire.enabled' = true.

The exceptions is below:
{quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot 
be cast to java.sql.Timestamp
 at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
 at java.lang.Thread.run(Thread.java:748)
{quote}
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:

@Test
 def testEarlyFireWithTumblingWindow(): Unit = {
 val stream = failingDataSource(data)
 .assignTimestampsAndWatermarks(
 new TimestampAndWatermarkWithOffset
 [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
 val table = stream.toTable(tEnv,
 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
 tEnv.registerTable("T1", table)
 
tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
 true)
 tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", 
"1000 ms")

val sql =
 """
 |SELECT
 | SUM(cnt) as s,
 | MAX(ts)
 |FROM
 | (SELECT
 | `string`,
 | `int`,
 | COUNT(*) AS cnt,
 | MAX(rowtime) as ts
 | FROM T1
 | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
 |GROUP BY `string`
 |""".stripMargin

tEnv.sqlQuery(sql).toRetractStream[Row].print()
 env.execute()
 }

 
  


> GroupAggsHandler throws java.time.LocalDateTime cannot be cast to 
> java.sql.Timestamp
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-15421
>                 URL: https://issues.apache.org/jira/browse/FLINK-15421
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.9.1, 1.10.0
>            Reporter: Benchao Li
>            Priority: Major
>
> `TimestmapType` has two types of physical representation: `Timestamp` and 
> `LocalDateTime`. When we use following SQL, it will conflict each other:
> {quote}SELECT 
>  SUM(cnt) as s, 
>  MAX(ts)
>  FROM 
>  SELECT 
>  `string`,
>  `int`,
>  COUNT * AS cnt,
>  MAX(rowtime) as ts
>  FROM T1
>  GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
>  GROUP BY `string`
> {quote}
> with 'table.exec.emit.early-fire.enabled' = true.
> The exceptions is below:
> {quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime 
> cannot be cast to java.sql.Timestamp
>  at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
>  at java.lang.Thread.run(Thread.java:748)
> {quote}
> I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
>  
> @Test
>  def testEarlyFireWithTumblingWindow(): Unit = {
>  val stream = failingDataSource(data)
>  .assignTimestampsAndWatermarks(
>  new TimestampAndWatermarkWithOffset
>  [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>  val table = stream.toTable(tEnv,
>  'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>  tEnv.registerTable("T1", table)
>  
> tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
>  true)
>  
> tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", 
> "1000 ms")
> val sql =
>  """
>  |SELECT
>  | SUM(cnt) as s,
>  | MAX(ts)
>  |FROM
>  | (SELECT
>  | `string`,
>  | `int`,
>  | COUNT(*) AS cnt,
>  | MAX(rowtime) as ts
>  | FROM T1
>  | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
>  |GROUP BY `string`
>  |""".stripMargin
> tEnv.sqlQuery(sql).toRetractStream[Row].print()
>  env.execute()
>  }
>  
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to