[ 
https://issues.apache.org/jira/browse/FLINK-20131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336031#comment-17336031
 ] 

Flink Jira Bot commented on FLINK-20131:
----------------------------------------

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Error when using LAST_VALUE on two different datatypes in same query with 
> over window
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-20131
>                 URL: https://issues.apache.org/jira/browse/FLINK-20131
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.0
>            Reporter: Thilo Schneider
>            Priority: Major
>              Labels: stale-major
>
> When using "LAST_VALUE(x) OVER w" on two different datatypes within the same 
> query, the application crashes with java.lang.Integer cannot be cast to 
> java.lang.Double (or similar, depending on used types).
> {code:python}
> #setup
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create( environment_settings=env_settings)
> t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism",
>  1)
> t_env.execute_sql("""
> CREATE TABLE datagen (
>  foo INT,
>  val AS cast(foo AS double),
>  message_time AS to_timestamp(from_unixtime(foo)),
>  WATERMARK FOR message_time AS message_time
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='2',
>  'fields.foo.kind'='sequence',
>  'fields.foo.start'='0',
>  'fields.foo.end'='19'
> )""")
> {code}
> This works:
> {code:python}
> t = t_env.sql_query("SELECT last_value(val), last_value(foo) AS lagfoo FROM 
> datagen ")
> t_env.execute_sql("CREATE TABLE output (a double, foo INT) WITH ('connector' 
> = 'print')")
> t.execute_insert("output")
> {code}
> This doesn't work:
> {code:python}
> t = t_env.sql_query("SELECT last_value(val) OVER w, last_value(foo) OVER w AS 
> lagfoo FROM datagen WINDOW w AS (ORDER BY message_time ROWS BETWEEN 1 
> PRECEDING AND CURRENT ROW)")
> t_env.execute_sql("CREATE TABLE output (a double, foo INT) WITH ('connector' 
> = 'print')")
> t.execute_insert("output")
> {code}
> The resulting stacktrace:
> {code:java}
> java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.Double
>         at 
> org$apache$flink$table$planner$functions$aggfunctions$LastValueWithRetractAggFunction$LastValueWithRetractAccumulator$Converter.toInternal(Unknown
>  Source) ~[?:?]
>         at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>         at BoundedOverAggregateHelper$43.getAccumulators(Unknown Source) 
> ~[?:?]
>         at 
> org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPrecedingFunction.onTimer(RowTimeRowsBoundedPrecedingFunction.java:303)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:72)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:183)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:600)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:199)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:95)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:181)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:577)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:541)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_141]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to