Hm, I am sorry to hear this. I must of missed it in docs, that beam version
upgrades can break flink state. It is important information for ones
wanting to use Beam on Flink in production.

So, I guess there is no guarantee for another bump of Flink version to not
break things until it reach 1.7.
Event then, thinks can break maybe? Is there a plan making Flink runner
more robust and catch compatibility issues early by tests?

Just trying to figure out my options with upgrades. Does other runners
suffer the same weak guarantees?


On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen <se...@apache.org> wrote:

> Flink 1.7 will change the way the "restore serializer" is handled, which
> should make it much easier to handle such cases.
> Especially breaking java class version format will not be an issue anymore.
>
> That should help to make it easier to give the Beam-on-Flink runner cross
> version compatibility.
>
>
> On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> AFAIK the serializer used here is the CoderTypeSerializer which may not
>> be recoverable because of changes to the contained Coder
>> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
>> small changes could break serialization backwards-compatibility.
>>
>> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
>> This should be improved for the next release.
>>
>> Thanks,
>> Max
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>
>> On 20.08.18 17:46, Stephan Ewen wrote:
>> > Hi Jozef!
>> >
>> > When restoring state, the serializer that created the state must still
>> > be available, so the state can be read.
>> > It looks like some serializer classes were removed between Beam versions
>> > (or changed in an incompatible manner).
>> >
>> > Backwards compatibility of an operator implementation needs cooperation
>> > from the operator. Withing Flink itself, when we change the way an
>> > operator uses state, we keep the old codepath and classes in a
>> > "backwards compatibility restore" that takes the old state and brings it
>> > into the shape of the new state.
>> >
>> > I am not deeply into the of how Beam and the Flink runner implement
>> > their use of state, but it looks this part is not present, which could
>> > mean that savepoints taken from Beam applications are not backwards
>> > compatible.
>> >
>> >
>> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vil...@gmail.com
>> > <mailto:jozo.vil...@gmail.com>> wrote:
>> >
>> >     Hello,
>> >
>> >     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>> >     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>> >     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>> >     starting a new App with updated libs from Flink save-point captured
>> >     by previous version of the app.
>> >
>> >     There is not change in topology. Job is accepted without error to
>> >     the new cluster which suggests that all operators are matched with
>> >     state based on IDs. However, app runs only few seccons and then
>> >     crash with:
>> >
>> >     java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >       at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >       at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>> >       at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>> >       at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >     Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1
>> provided restore options.
>> >       at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> >       at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>> >       at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>> >       ... 5 more
>> >     Caused by: java.io.IOException: Unable to restore operator state
>> [bundle-buffer-tag]. The previous serializer of the operator state must be
>> present; the serializer could have been removed from the classpath, or its
>> implementation have changed and could not be loaded. This is a temporary
>> restriction that will be fixed in future versions.
>> >       at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>> >       at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
>> >       at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>> >       at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>> >       ... 7 more
>> >
>> >
>> >     Does this mean anything to anyone? Am I doing anything wrong or did
>> >     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
>> >     seems to be too deep internal in runner for my reach.
>> >
>> >     Any help is much appreciated.
>> >
>> >     Best,
>> >     Jozo
>> >
>> >
>>
>
>

Reply via email to