Reasons we use Java serialization are not fundamental, probably only
historical. Thinking about it, yes, there is lucky coincidence that we
currently have to change the serialization because of Flink 1.17
support. Flink 1.17 actually removes the legacy java serialization from
Flink and enforces custom serialization. Therefore, we need to introduce
an upgrade compatible change of serialization to support Flink 1.17.
This is already implemented in [1]. The PR can go further, though. We
can replace Java serialization of Coder in the TypeSerializerSnapshot
and use the portable representation of Coder (which will still use Java
serialization in some cases, but might avoid it at least for well-known
coders, moreover Coders should be more upgrade-stable classes).
I'll try to restore the SerializablePipelineOptions (copy&paste) in
FlinkRunner only and rework the serialization in a more stable way (at
least avoid serializing the CoderTypeSerializer, which references the
SerializablePipelineOptions).
I created [2] and marked it as blocker for 2.55.0 release, because
otherwise we would break the upgrade.
Thanks for the discussion, it helped a lot.
Jan
[1] https://github.com/apache/beam/pull/30197
[2] https://github.com/apache/beam/issues/30385
On 2/21/24 20:33, Kenneth Knowles wrote:
Yea I think we should restore the necessary classes but also fix the
FlinkRunner. Java serialization is inherently self-update-incompatible.
On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev
<dev@beam.apache.org> wrote:
Is there a fundamental reason we serialize java classes into Flink
savepoints.
On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev
<dev@beam.apache.org> wrote:
We could consider merging the gradle targets without renaming the
classpaths as an intermediate step.
Optimistically, perhaps there's a small number of classes that
we need
to preserve (e.g. SerializablePipelineOptions looks like it was
something specifically intended to be serialized; maybe that an a
handful of others (that implement Serializable) could be left
in their
original packages for backwards compatibility reasons?
On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz>
wrote:
>
> Hi,
>
> while implementing FlinkRunner for Flink 1.17 I tried to
verify that a
> running Pipeline is able to successfully upgrade from Flink
1.16 to
> Flink 1.17. There is some change regarding serialization
needed for
> Flink 1.17, so this was a concern. Unfortunately recently we
merged
> core-construction-java into SDK, which resulted in some
classes being
> repackaged. Unfortunately, we serialize some classes into
Flink's
> check/savepoints. The renaming of the class therefore ends
with the
> following exception trying to restore from the savepoint:
>
> Caused by: java.lang.ClassNotFoundException:
>
org.apache.beam.runners.core.construction.SerializablePipelineOptions
> at java.base/java.net
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:476)
> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at
>
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
> at
>
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at
>
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> at
>
org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
> at java.base/java.lang.Class.forName0(Native Method)
> at java.base/java.lang.Class.forName(Class.java:398)
> at
>
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at
>
org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>
>
> This means that no Pipeline will be able to successfully
upgrade from
> version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will
have to be
> restarted from scratch). I wanted to know how the community
would feel
> about that, this consequence probably was not clear when we
merged the
> artifacts. The only option would be to revert the merge and
then try to
> figure out how to avoid Java serialization in Flink's
savepoints. That
> would definitely be costly in terms of implementation and
even more to
> provide ways to transfer old savepoints to the new format
(can be
> possible using state processor API). I'm aware that Beam
provides no
> general guarantees about the upgrade compatibility, so it
might be fine
> to just ignore this, I just wanted to shout this out loud so
that we can
> make a deliberate decision.
>
> Best,
>
> Jan
>