[ 
https://issues.apache.org/jira/browse/FLINK-37240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eric Xiao updated FLINK-37240:
------------------------------
    Description: 
We observed that when there is a SQL pipeline with a join involved and the 
nullability of a column is changed in either the output and input schemas this 
is a {*}state incompatible change{*}. Strangely enough, if a nullability change 
is made on a SQL pipeline without a join, that change seems compatible.
{code:java}
java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
        at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:168)
        at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:154)
        at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
        at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:113)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97) 
must not be incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97).
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:305)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:356)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
 {code}
----
To reproduce these situations:

Changing the nullability on `right_varchar` between job restores {*}is state 
compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output`
SELECT 
    right_id AS id,
    right_int,
    right_varchar
FROM `eric-schema-compatibility-input--right` as r{code}
Changing the nullability on `right_varchar` between job restores is {*}not 
state compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output_clone`
SELECT 
    left_id AS id,
    left_int,
    right_int,
    right_varchar
FROM `eric-schema-compatibility-input--left` as l
JOIN `eric-schema-compatibility-input--right` as r
ON r.right_id = l.left_id{code}
 

  was:
We observed that when there is a SQL pipeline with a join involved and the 
nullability of a column is changed in either the output and input schemas this 
is a {*}state incompatible change{*}. Strangely enough, if a nullability change 
is made on a SQL pipeline without a join, that change seems compatible.
{code:java}
java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
        at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:168)
        at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:154)
        at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
        at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:113)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97) 
must not be incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97).
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:305)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:356)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
 {code}
----
To reproduce these situations:

Changing the nullability on `right_varchar` between job restores {*}is state 
compatible{*}.

 
{code:java}
INSERT INTO `eric-schema-compatibility-output`
SELECT 
    right_id AS id,
    right_int,
    right_varchar
FROM `eric-schema-compatibility-input--right` as r{code}
Changing the nullability on `right_varchar` between job restores is {*}not 
state compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output_clone`
SELECT 
    left_id AS id,
    left_int,
    right_int,
    right_varchar
FROM `eric-schema-compatibility-input--left` as l
JOIN `eric-schema-compatibility-input--right` as r
ON r.right_id = l.left_id{code}
 


> Changing column nullability throws StateMigrationException in a SQL pipeline 
> with a JOIN
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-37240
>                 URL: https://issues.apache.org/jira/browse/FLINK-37240
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: Eric Xiao
>            Priority: Major
>
> We observed that when there is a SQL pipeline with a join involved and the 
> nullability of a column is changed in either the output and input schemas 
> this is a {*}state incompatible change{*}. Strangely enough, if a nullability 
> change is made on a SQL pipeline without a join, that change seems compatible.
> {code:java}
> java.lang.RuntimeException: Error while getting state
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
>       at 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:168)
>       at 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:154)
>       at 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
>       at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:113)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97).
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>       at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:305)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:356)
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
>  {code}
> ----
> To reproduce these situations:
> Changing the nullability on `right_varchar` between job restores {*}is state 
> compatible{*}.
> {code:java}
> INSERT INTO `eric-schema-compatibility-output`
> SELECT 
>     right_id AS id,
>     right_int,
>     right_varchar
> FROM `eric-schema-compatibility-input--right` as r{code}
> Changing the nullability on `right_varchar` between job restores is {*}not 
> state compatible{*}.
> {code:java}
> INSERT INTO `eric-schema-compatibility-output_clone`
> SELECT 
>     left_id AS id,
>     left_int,
>     right_int,
>     right_varchar
> FROM `eric-schema-compatibility-input--left` as l
> JOIN `eric-schema-compatibility-input--right` as r
> ON r.right_id = l.left_id{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to