[
https://issues.apache.org/jira/browse/FLINK-37240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eric Xiao updated FLINK-37240:
------------------------------
Description:
We observed that when there is a SQL pipeline with a join involved and the
nullability of a column is changed in either the output and input schemas this
is a {*}state incompatible change{*}. Strangely enough, if a nullability change
is made on a SQL pipeline without a join, that change seems compatible.
{code:java}
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
at
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:168)
at
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:154)
at
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:113)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97)
must not be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97).
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:305)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:356)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
{code}
----
To reproduce these situations:
Changing the nullability on `right_varchar` between job restores {*}is state
compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output`
SELECT
right_id AS id,
right_int,
right_varchar
FROM `eric-schema-compatibility-input--right` as r{code}
Changing the nullability on `right_varchar` between job restores is {*}not
state compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output_clone`
SELECT
left_id AS id,
left_int,
right_int,
right_varchar
FROM `eric-schema-compatibility-input--left` as l
JOIN `eric-schema-compatibility-input--right` as r
ON r.right_id = l.left_id{code}
was:
We observed that when there is a SQL pipeline with a join involved and the
nullability of a column is changed in either the output and input schemas this
is a {*}state incompatible change{*}. Strangely enough, if a nullability change
is made on a SQL pipeline without a join, that change seems compatible.
{code:java}
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
at
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:168)
at
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:154)
at
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:113)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97)
must not be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97).
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:305)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:356)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
{code}
----
To reproduce these situations:
Changing the nullability on `right_varchar` between job restores {*}is state
compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output`
SELECT
right_id AS id,
right_int,
right_varchar
FROM `eric-schema-compatibility-input--right` as r{code}
Changing the nullability on `right_varchar` between job restores is {*}not
state compatible{*}.
{code:java}
INSERT INTO `eric-schema-compatibility-output_clone`
SELECT
left_id AS id,
left_int,
right_int,
right_varchar
FROM `eric-schema-compatibility-input--left` as l
JOIN `eric-schema-compatibility-input--right` as r
ON r.right_id = l.left_id{code}
> Changing column nullability throws StateMigrationException in a SQL pipeline
> with a JOIN
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-37240
> URL: https://issues.apache.org/jira/browse/FLINK-37240
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Reporter: Eric Xiao
> Priority: Major
>
> We observed that when there is a SQL pipeline with a join involved and the
> nullability of a column is changed in either the output and input schemas
> this is a {*}state incompatible change{*}. Strangely enough, if a nullability
> change is made on a SQL pipeline without a join, that change seems compatible.
> {code:java}
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:168)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:154)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:113)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97) must not
> be incompatible with the old state serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@44603b97).
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:305)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:356)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
> {code}
> ----
> To reproduce these situations:
> Changing the nullability on `right_varchar` between job restores {*}is state
> compatible{*}.
> {code:java}
> INSERT INTO `eric-schema-compatibility-output`
> SELECT
> right_id AS id,
> right_int,
> right_varchar
> FROM `eric-schema-compatibility-input--right` as r{code}
> Changing the nullability on `right_varchar` between job restores is {*}not
> state compatible{*}.
> {code:java}
> INSERT INTO `eric-schema-compatibility-output_clone`
> SELECT
> left_id AS id,
> left_int,
> right_int,
> right_varchar
> FROM `eric-schema-compatibility-input--left` as l
> JOIN `eric-schema-compatibility-input--right` as r
> ON r.right_id = l.left_id{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)