[ 
https://issues.apache.org/jira/browse/FLINK-23277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-23277:
--------------------------------------
    Description: 
Upon recovery, changelog backend requests states to apply changes.
TTL config is not available at this moment, so states are created regardless of 
TTL config.
One solution is to serialize TTL config along with metadata (in changelog).

Note: values are already serialized as TTL values and serializers as TTL 
seralizers

{code}
Caused by: java.lang.ClassCastException: 
org.apache.flink.runtime.state.ttl.TtlValue cannot be cast to 
org.apache.flink.table.data.RowData
   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
   at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:228)
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:428)
   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:691)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:646)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:657)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:630)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
   at java.lang.Thread.run(Thread.java:748)
{code}

(doesn't affect test stability as changelog backend is currently disabled in 
tests)

  was:
Upon recovery, changelog backend requests states to apply changes.
TTL config is not available at this moment, so states are created regardless of 
TTL config.
One solution is to serialize TTL config along with metadata (in changelog).

Note: values are already serialized as TTL values and serializers as TTL 
seralizers

{code}
Caused by: java.lang.ClassCastException: 
org.apache.flink.runtime.state.ttl.TtlValue cannot be cast to 
org.apache.flink.table.data.RowData
   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
   at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:228)
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:428)
   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:691)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:646)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:657)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:630)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
   at java.lang.Thread.run(Thread.java:748)
{code}



> Changelog backend doesn't apply TTL after recovery
> --------------------------------------------------
>
>                 Key: FLINK-23277
>                 URL: https://issues.apache.org/jira/browse/FLINK-23277
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Upon recovery, changelog backend requests states to apply changes.
> TTL config is not available at this moment, so states are created regardless 
> of TTL config.
> One solution is to serialize TTL config along with metadata (in changelog).
> Note: values are already serialized as TTL values and serializers as TTL 
> seralizers
> {code}
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.ttl.TtlValue cannot be cast to 
> org.apache.flink.table.data.RowData
>    at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
>    at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>    at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>    at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:228)
>    at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>    at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>    at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:428)
>    at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:691)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:646)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:657)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:630)
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>    at java.lang.Thread.run(Thread.java:748)
> {code}
> (doesn't affect test stability as changelog backend is currently disabled in 
> tests)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to