Reasons we use Java serialization are not fundamental, probably only historical. Thinking about it, yes, there is lucky coincidence that we currently have to change the serialization because of Flink 1.17 support. Flink 1.17 actually removes the legacy java serialization from Flink and enforces custom serialization. Therefore, we need to introduce an upgrade compatible change of serialization to support Flink 1.17. This is already implemented in [1]. The PR can go further, though. We can replace Java serialization of Coder in the TypeSerializerSnapshot and use the portable representation of Coder (which will still use Java serialization in some cases, but might avoid it at least for well-known coders, moreover Coders should be more upgrade-stable classes).

I'll try to restore the SerializablePipelineOptions (copy&paste) in FlinkRunner only and rework the serialization in a more stable way (at least avoid serializing the CoderTypeSerializer, which references the SerializablePipelineOptions).

I created [2] and marked it as blocker for 2.55.0 release, because otherwise we would break the upgrade.

Thanks for the discussion, it helped a lot.

 Jan

[1] https://github.com/apache/beam/pull/30197

[2] https://github.com/apache/beam/issues/30385

On 2/21/24 20:33, Kenneth Knowles wrote:
Yea I think we should restore the necessary classes but also fix the FlinkRunner. Java serialization is inherently self-update-incompatible.

On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <dev@beam.apache.org> wrote:

    Is there a fundamental reason we serialize java classes into Flink
    savepoints.

    On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev
    <dev@beam.apache.org> wrote:

        We could consider merging the gradle targets without renaming the
        classpaths as an intermediate step.

        Optimistically, perhaps there's a small number of classes that
        we need
        to preserve (e.g. SerializablePipelineOptions looks like it was
        something specifically intended to be serialized; maybe that an a
        handful of others (that implement Serializable) could be left
        in their
        original packages for backwards compatibility reasons?

        On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz>
        wrote:
        >
        > Hi,
        >
        > while implementing FlinkRunner for Flink 1.17 I tried to
        verify that a
        > running Pipeline is able to successfully upgrade from Flink
        1.16 to
        > Flink 1.17. There is some change regarding serialization
        needed for
        > Flink 1.17, so this was a concern. Unfortunately recently we
        merged
        > core-construction-java into SDK, which resulted in some
        classes being
        > repackaged. Unfortunately, we serialize some classes into
        Flink's
        > check/savepoints. The renaming of the class therefore ends
        with the
        > following exception trying to restore from the savepoint:
        >
        > Caused by: java.lang.ClassNotFoundException:
        >
        org.apache.beam.runners.core.construction.SerializablePipelineOptions
        >      at java.base/java.net
        <http://java.net>.URLClassLoader.findClass(URLClassLoader.java:476)
        >      at
        java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
        >      at
        >
        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
        >      at
        >
        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
        >      at
        >
        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
        >      at
        java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        >      at
        >
        
org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
        >      at java.base/java.lang.Class.forName0(Native Method)
        >      at java.base/java.lang.Class.forName(Class.java:398)
        >      at
        >
        
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
        >      at
        >
        
org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
        >
        >
        > This means that no Pipeline will be able to successfully
        upgrade from
        > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will
        have to be
        > restarted from scratch). I wanted to know how the community
        would feel
        > about that, this consequence probably was not clear when we
        merged the
        > artifacts. The only option would be to revert the merge and
        then try to
        > figure out how to avoid Java serialization in Flink's
        savepoints. That
        > would definitely be costly in terms of implementation and
        even more to
        > provide ways to transfer old savepoints to the new format
        (can be
        > possible using state processor API). I'm aware that Beam
        provides no
        > general guarantees about the upgrade compatibility, so it
        might be fine
        > to just ignore this, I just wanted to shout this out loud so
        that we can
        > make a deliberate decision.
        >
        > Best,
        >
        >   Jan
        >

Reply via email to